diff --git a/Cargo.lock b/Cargo.lock index 1df009c2df32..afb51230247d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1940,18 +1940,18 @@ dependencies = [ "console-subscriber", "lazy_static", "once_cell", - "opentelemetry 0.17.0", - "opentelemetry-jaeger", + "opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk 0.21.0", "parking_lot 0.12.1", "prometheus", - "rand", - "rs-snowflake", "serde", "tokio", "tracing", "tracing-appender", "tracing-futures", - "tracing-log", + "tracing-log 0.1.4", "tracing-opentelemetry", "tracing-subscriber", ] @@ -3315,7 +3315,7 @@ dependencies = [ "moka", "object-store", "openmetrics-parser", - "opentelemetry-proto", + "opentelemetry-proto 0.3.0", "operator", "partition", "prometheus", @@ -3599,7 +3599,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7eb2e78be7a104d2582fbea0bcb1e019407da702#7eb2e78be7a104d2582fbea0bcb1e019407da702" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=25429306d0379ad29211a062a81da2554a0208ab#25429306d0379ad29211a062a81da2554a0208ab" dependencies = [ "prost 0.12.1", "serde", @@ -5419,23 +5419,18 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "opentelemetry" -version = "0.17.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" dependencies = [ - "async-trait", - "crossbeam-channel", - "futures-channel", - "futures-executor", - "futures-util", + "futures-core", + "futures-sink", + "indexmap 2.0.2", "js-sys", - "lazy_static", - "percent-encoding", - "pin-project", - "rand", + "once_cell", + "pin-project-lite", "thiserror", - "tokio", - "tokio-stream", + "urlencoding", ] [[package]] @@ -5454,18 +5449,22 @@ dependencies = [ ] [[package]] -name = "opentelemetry-jaeger" -version = "0.16.0" +name = "opentelemetry-otlp" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c0b12cd9e3f9b35b52f6e0dac66866c519b26f424f4bbf96e3fe8bfbdc5229" +checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930" dependencies = [ "async-trait", - "lazy_static", - "opentelemetry 0.17.0", + "futures-core", + "http", + "opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)", + "opentelemetry-proto 0.4.0", "opentelemetry-semantic-conventions", + "opentelemetry_sdk 0.21.0", + "prost 0.11.9", "thiserror", - "thrift 0.15.0", "tokio", + "tonic 0.9.2", ] [[package]] @@ -5473,19 +5472,31 @@ name = "opentelemetry-proto" version = "0.3.0" source = "git+https://github.com/waynexia/opentelemetry-rust.git?rev=33841b38dda79b15f2024952be5f32533325ca02#33841b38dda79b15f2024952be5f32533325ca02" dependencies = [ - "opentelemetry 0.21.0", - "opentelemetry_sdk", + "opentelemetry 0.21.0 (git+https://github.com/waynexia/opentelemetry-rust.git?rev=33841b38dda79b15f2024952be5f32533325ca02)", + "opentelemetry_sdk 0.20.0", "prost 0.12.1", "tonic 0.10.2", ] +[[package]] +name = "opentelemetry-proto" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2e155ce5cc812ea3d1dffbd1539aed653de4bf4882d60e6e04dcf0901d674e1" +dependencies = [ + "opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)", + "opentelemetry_sdk 0.21.0", + "prost 0.11.9", + "tonic 0.9.2", +] + [[package]] name = "opentelemetry-semantic-conventions" -version = "0.9.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd" +checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" dependencies = [ - "opentelemetry 0.17.0", + "opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -5500,11 +5511,33 @@ dependencies = [ "futures-util", "glob", "once_cell", - "opentelemetry 0.21.0", + "opentelemetry 0.21.0 (git+https://github.com/waynexia/opentelemetry-rust.git?rev=33841b38dda79b15f2024952be5f32533325ca02)", + "ordered-float 4.1.1", + "percent-encoding", + "rand", + "thiserror", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5b3ce3f5705e2ae493be467a0b23be4bc563c193cdb7713e55372c89a906b34" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)", "ordered-float 4.1.1", "percent-encoding", "rand", "thiserror", + "tokio", + "tokio-stream", ] [[package]] @@ -5589,15 +5622,6 @@ dependencies = [ "zstd 0.12.4", ] -[[package]] -name = "ordered-float" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7" -dependencies = [ - "num-traits", -] - [[package]] name = "ordered-float" version = "2.10.1" @@ -5767,7 +5791,7 @@ dependencies = [ "paste", "seq-macro", "snap", - "thrift 0.17.0", + "thrift", "tokio", "twox-hash", "zstd 0.12.4", @@ -7212,12 +7236,6 @@ dependencies = [ "serde", ] -[[package]] -name = "rs-snowflake" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e60ef3b82994702bbe4e134d98aadca4b49ed04440148985678d415c68127666" - [[package]] name = "rsa" version = "0.6.1" @@ -8263,7 +8281,7 @@ dependencies = [ "once_cell", "openmetrics-parser", "opensrv-mysql", - "opentelemetry-proto", + "opentelemetry-proto 0.3.0", "parking_lot 0.12.1", "pgwire", "pin-project", @@ -9307,7 +9325,7 @@ dependencies = [ "num_cpus", "object-store", "once_cell", - "opentelemetry-proto", + "opentelemetry-proto 0.3.0", "operator", "partition", "paste", @@ -9397,28 +9415,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "threadpool" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" -dependencies = [ - "num_cpus", -] - -[[package]] -name = "thrift" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b82ca8f46f95b3ce96081fe3dd89160fdea970c254bb72925255d1b62aae692e" -dependencies = [ - "byteorder", - "integer-encoding", - "log", - "ordered-float 1.1.1", - "threadpool", -] - [[package]] name = "thrift" version = "0.17.0" @@ -9952,18 +9948,33 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + [[package]] name = "tracing-opentelemetry" -version = "0.17.4" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbbe89715c1dbbb790059e2565353978564924ee85017b5fff365c872ff6721f" +checksum = "c67ac25c5407e7b961fafc6f7e9aa5958fd297aada2d20fa2ae1737357e55596" dependencies = [ + "js-sys", "once_cell", - "opentelemetry 0.17.0", + "opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)", + "opentelemetry_sdk 0.21.0", + "smallvec", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.2.0", "tracing-subscriber", + "web-time", ] [[package]] @@ -9981,7 +9992,7 @@ dependencies = [ "thread_local", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.1.4", ] [[package]] @@ -10545,6 +10556,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57099a701fb3a8043f993e8228dc24229c7b942e2b009a1b962e54489ba1d3bf" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki" version = "0.22.4" diff --git a/Cargo.toml b/Cargo.toml index 173cb687cebc..9c9db1117653 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ derive_builder = "0.12" etcd-client = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7eb2e78be7a104d2582fbea0bcb1e019407da702" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "25429306d0379ad29211a062a81da2554a0208ab" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 666430d48175..a35c79a38e76 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -109,9 +109,7 @@ vector_cache_size = "512MB" sst_write_buffer_size = "8MB" -# Log options +# Log options, see `standalone.example.toml` # [logging] -# Specify logs directory. # dir = "/tmp/greptimedb/logs" -# Specify the log level [info | debug | error | warn] # level = "info" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index e4bd41a616c4..623b5f5452e0 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -158,3 +158,7 @@ global_write_buffer_size = "1GB" # dir = "/tmp/greptimedb/logs" # Specify the log level [info | debug | error | warn] # level = "info" +# whether enable tracing, default is false +# enable_otlp_tracing = false +# tracing exporter endpoint with format `ip:port`, we use grpc oltp as exporter, default endpoint is `localhost:4317` +# otlp_endpoint = "localhost:4317" diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index 46f1b1e9e6ec..ca7d3ae96ba3 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -78,7 +78,7 @@ async fn run() { let logical = mock_logical_plan(); event!(Level::INFO, "plan size: {:#?}", logical.len()); - let result = db.logical_plan(logical, 0).await.unwrap(); + let result = db.logical_plan(logical).await.unwrap(); event!(Level::INFO, "result: {:#?}", result); } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 563164d59360..ebbb4fa60a44 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -29,6 +29,7 @@ use common_query::Output; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::RecordBatchStreamAdaptor; use common_telemetry::logging; +use common_telemetry::tracing_context::W3cTrace; use futures_util::StreamExt; use prost::Message; use snafu::{ensure, ResultExt}; @@ -147,21 +148,21 @@ impl Database { async fn handle(&self, request: Request) -> Result { let mut client = self.client.make_database_client()?.inner; - let request = self.to_rpc_request(request, 0); + let request = self.to_rpc_request(request); let response = client.handle(request).await?.into_inner(); from_grpc_response(response) } #[inline] - fn to_rpc_request(&self, request: Request, trace_id: u64) -> GreptimeRequest { + fn to_rpc_request(&self, request: Request) -> GreptimeRequest { GreptimeRequest { header: Some(RequestHeader { catalog: self.catalog.clone(), schema: self.schema.clone(), authorization: self.ctx.auth_header.clone(), dbname: self.dbname.clone(), - trace_id, - span_id: 0, + // TODO(Taylor-lagrange): add client grpc tracing + tracing_context: W3cTrace::new(), }), request: Some(request), } @@ -172,23 +173,17 @@ impl Database { S: AsRef, { let _timer = metrics::METRIC_GRPC_SQL.start_timer(); - self.do_get( - Request::Query(QueryRequest { - query: Some(Query::Sql(sql.as_ref().to_string())), - }), - 0, - ) + self.do_get(Request::Query(QueryRequest { + query: Some(Query::Sql(sql.as_ref().to_string())), + })) .await } - pub async fn logical_plan(&self, logical_plan: Vec, trace_id: u64) -> Result { + pub async fn logical_plan(&self, logical_plan: Vec) -> Result { let _timer = metrics::METRIC_GRPC_LOGICAL_PLAN.start_timer(); - self.do_get( - Request::Query(QueryRequest { - query: Some(Query::LogicalPlan(logical_plan)), - }), - trace_id, - ) + self.do_get(Request::Query(QueryRequest { + query: Some(Query::LogicalPlan(logical_plan)), + })) .await } @@ -200,68 +195,53 @@ impl Database { step: &str, ) -> Result { let _timer = metrics::METRIC_GRPC_PROMQL_RANGE_QUERY.start_timer(); - self.do_get( - Request::Query(QueryRequest { - query: Some(Query::PromRangeQuery(PromRangeQuery { - query: promql.to_string(), - start: start.to_string(), - end: end.to_string(), - step: step.to_string(), - })), - }), - 0, - ) + self.do_get(Request::Query(QueryRequest { + query: Some(Query::PromRangeQuery(PromRangeQuery { + query: promql.to_string(), + start: start.to_string(), + end: end.to_string(), + step: step.to_string(), + })), + })) .await } pub async fn create(&self, expr: CreateTableExpr) -> Result { let _timer = metrics::METRIC_GRPC_CREATE_TABLE.start_timer(); - self.do_get( - Request::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateTable(expr)), - }), - 0, - ) + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(expr)), + })) .await } pub async fn alter(&self, expr: AlterExpr) -> Result { let _timer = metrics::METRIC_GRPC_ALTER.start_timer(); - self.do_get( - Request::Ddl(DdlRequest { - expr: Some(DdlExpr::Alter(expr)), - }), - 0, - ) + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(expr)), + })) .await } pub async fn drop_table(&self, expr: DropTableExpr) -> Result { let _timer = metrics::METRIC_GRPC_DROP_TABLE.start_timer(); - self.do_get( - Request::Ddl(DdlRequest { - expr: Some(DdlExpr::DropTable(expr)), - }), - 0, - ) + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::DropTable(expr)), + })) .await } pub async fn truncate_table(&self, expr: TruncateTableExpr) -> Result { let _timer = metrics::METRIC_GRPC_TRUNCATE_TABLE.start_timer(); - self.do_get( - Request::Ddl(DdlRequest { - expr: Some(DdlExpr::TruncateTable(expr)), - }), - 0, - ) + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::TruncateTable(expr)), + })) .await } - async fn do_get(&self, request: Request, trace_id: u64) -> Result { + async fn do_get(&self, request: Request) -> Result { // FIXME(paomian): should be added some labels for metrics let _timer = metrics::METRIC_GRPC_DO_GET.start_timer(); - let request = self.to_rpc_request(request, trace_id); + let request = self.to_rpc_request(request); let request = Ticket { ticket: request.encode_to_vec().into(), }; diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index cf520690cfa1..efaf26ec0fc9 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -208,7 +208,8 @@ async fn main() -> Result<()> { }; common_telemetry::set_panic_hook(); - let _guard = common_telemetry::init_global_logging(app_name, logging_opts, tracing_opts); + let _guard = + common_telemetry::init_global_logging(app_name, logging_opts, tracing_opts, opts.node_id()); // Report app version as gauge. APP_VERSION diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 5173d0336831..e99a5403fb5e 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -176,7 +176,7 @@ impl Repl { .encode(&plan) .context(SubstraitEncodeLogicalPlanSnafu)?; - self.database.logical_plan(plan.to_vec(), 0).await + self.database.logical_plan(plan.to_vec()).await } else { self.database.sql(&sql).await } diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 3908eda60fc8..ace707a7c57b 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -133,6 +133,15 @@ impl Options { Ok(opts) } + + pub fn node_id(&self) -> Option { + match self { + Options::Metasrv(_) | Options::Cli(_) => None, + Options::Datanode(opt) => opt.node_id.map(|x| x.to_string()), + Options::Frontend(opt) => opt.node_id.clone(), + Options::Standalone(opt) => opt.frontend.node_id.clone(), + } + } } #[cfg(test)] diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 4ec389199f91..0c3327167037 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use api::v1::meta::Partition; +use common_telemetry::tracing_context::W3cTrace; use store_api::storage::TableId; use table::metadata::RawTableInfo; @@ -34,6 +35,7 @@ pub mod utils; #[derive(Debug, Default)] pub struct ExecutorContext { pub cluster_id: Option, + pub tracing_context: Option, } #[async_trait::async_trait] diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index ae215c83b6ee..5d3f0e447ce8 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -26,6 +26,7 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, }; +use common_telemetry::tracing_context::TracingContext; use common_telemetry::{debug, info}; use futures::future; use serde::{Deserialize, Serialize}; @@ -207,7 +208,7 @@ impl AlterTableProcedure { let request = self.create_alter_region_request(region_id)?; let request = RegionRequest { header: Some(RegionRequestHeader { - trace_id: common_telemetry::trace_id().unwrap_or_default(), + tracing_context: TracingContext::from_current_span().to_w3c(), ..Default::default() }), body: Some(region_request::Body::Alter(request)), diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 2d0190e957ff..54632e9b3f05 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; use common_telemetry::info; +use common_telemetry::tracing_context::TracingContext; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; @@ -199,7 +200,7 @@ impl CreateTableProcedure { for request in requests { let request = RegionRequest { header: Some(RegionRequestHeader { - trace_id: common_telemetry::trace_id().unwrap_or_default(), + tracing_context: TracingContext::from_current_span().to_w3c(), ..Default::default() }), body: Some(request), diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 98f9849bde5b..b32243eced47 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -22,6 +22,7 @@ use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, }; +use common_telemetry::tracing_context::TracingContext; use common_telemetry::{debug, info}; use futures::future::join_all; use serde::{Deserialize, Serialize}; @@ -157,7 +158,7 @@ impl DropTableProcedure { let request = RegionRequest { header: Some(RegionRequestHeader { - trace_id: common_telemetry::trace_id().unwrap_or_default(), + tracing_context: TracingContext::from_current_span().to_w3c(), ..Default::default() }), body: Some(region_request::Body::Drop(PbDropRegionRequest { diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index 0cff39f362cf..ec5a7897cd63 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -21,6 +21,7 @@ use common_procedure::{ Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, }; use common_telemetry::debug; +use common_telemetry::tracing_context::TracingContext; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -154,7 +155,7 @@ impl TruncateTableProcedure { let request = RegionRequest { header: Some(RegionRequestHeader { - trace_id: common_telemetry::trace_id().unwrap_or_default(), + tracing_context: TracingContext::from_current_span().to_w3c(), ..Default::default() }), body: Some(region_request::Body::Truncate(PbTruncateRegionRequest { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 48fb6519c907..3d3cd5ae1c1f 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -15,7 +15,8 @@ use std::sync::Arc; use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; -use common_telemetry::info; +use common_telemetry::tracing_context::{FutureExt, TracingContext}; +use common_telemetry::{info, tracing}; use snafu::{OptionExt, ResultExt}; use crate::cache_invalidator::CacheInvalidatorRef; @@ -140,6 +141,7 @@ impl DdlManager { }) } + #[tracing::instrument(skip_all)] pub async fn submit_alter_table_task( &self, cluster_id: u64, @@ -156,6 +158,7 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + #[tracing::instrument(skip_all)] pub async fn submit_create_table_task( &self, cluster_id: u64, @@ -172,6 +175,7 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + #[tracing::instrument(skip_all)] pub async fn submit_drop_table_task( &self, cluster_id: u64, @@ -194,6 +198,7 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + #[tracing::instrument(skip_all)] pub async fn submit_truncate_table_task( &self, cluster_id: u64, @@ -383,21 +388,31 @@ impl DdlTaskExecutor for DdlManager { ctx: &ExecutorContext, request: SubmitDdlTaskRequest, ) -> Result { - let cluster_id = ctx.cluster_id.unwrap_or_default(); - info!("Submitting Ddl task: {:?}", request.task); - match request.task { - CreateTable(create_table_task) => { - handle_create_table_task(self, cluster_id, create_table_task).await - } - DropTable(drop_table_task) => { - handle_drop_table_task(self, cluster_id, drop_table_task).await - } - AlterTable(alter_table_task) => { - handle_alter_table_task(self, cluster_id, alter_table_task).await - } - TruncateTable(truncate_table_task) => { - handle_truncate_table_task(self, cluster_id, truncate_table_task).await + let span = ctx + .tracing_context + .as_ref() + .map(TracingContext::from_w3c) + .unwrap_or(TracingContext::from_current_span()) + .attach(tracing::info_span!("DdlManager::submit_ddl_task")); + async move { + let cluster_id = ctx.cluster_id.unwrap_or_default(); + info!("Submitting Ddl task: {:?}", request.task); + match request.task { + CreateTable(create_table_task) => { + handle_create_table_task(self, cluster_id, create_table_task).await + } + DropTable(drop_table_task) => { + handle_drop_table_task(self, cluster_id, drop_table_task).await + } + AlterTable(alter_table_task) => { + handle_alter_table_task(self, cluster_id, alter_table_task).await + } + TruncateTable(truncate_table_task) => { + handle_truncate_table_task(self, cluster_id, truncate_table_task).await + } } } + .trace(span) + .await } } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index a4266328376a..7bb7732bfcdf 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -23,7 +23,8 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use backon::ExponentialBuilder; use common_runtime::{RepeatedTask, TaskFunction}; -use common_telemetry::{info, logging}; +use common_telemetry::tracing_context::{FutureExt, TracingContext}; +use common_telemetry::{info, logging, tracing}; use snafu::{ensure, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::{Mutex as TokioMutex, Notify}; @@ -452,9 +453,19 @@ impl LocalManager { DuplicateProcedureSnafu { procedure_id }, ); + let tracing_context = TracingContext::from_current_span(); + let _handle = common_runtime::spawn_bg(async move { // Run the root procedure. - runner.run().await; + // The task was moved to another runtime for execution. + // In order not to interrupt tracing, a span needs to be created to continue tracing the current task. + runner + .run() + .trace( + tracing_context + .attach(tracing::info_span!("LocalManager::submit_root_procedure")), + ) + .await; }); Ok(watcher) diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index cc5fe5423267..6d5e7a96ffcf 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -14,20 +14,19 @@ common-error.workspace = true console-subscriber = { version = "0.1", optional = true } lazy_static.workspace = true once_cell.workspace = true -opentelemetry = { version = "0.17", default-features = false, features = [ +opentelemetry = { version = "0.21.0", default-features = false, features = [ "trace", - "rt-tokio", ] } -opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "0.14.0", features = ["tokio"] } +opentelemetry-semantic-conventions = "0.13.0" +opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] } parking_lot = { version = "0.12" } prometheus.workspace = true -rand.workspace = true -rs-snowflake = "0.6" serde.workspace = true tokio.workspace = true tracing = "0.1" tracing-appender = "0.2" tracing-futures = { version = "0.2", features = ["futures-03"] } tracing-log = "0.1" -tracing-opentelemetry = "0.17" +tracing-opentelemetry = "0.22.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/common/telemetry/src/lib.rs b/src/common/telemetry/src/lib.rs index 4f7eed931350..f13cdf72dc68 100644 --- a/src/common/telemetry/src/lib.rs +++ b/src/common/telemetry/src/lib.rs @@ -16,45 +16,9 @@ pub mod logging; mod macros; pub mod metric; mod panic_hook; +pub mod tracing_context; -use std::collections::hash_map::DefaultHasher; -use std::hash::{Hash, Hasher}; - -pub use logging::{init_default_ut_logging, init_global_logging, trace_id, TRACE_ID}; +pub use logging::{init_default_ut_logging, init_global_logging}; pub use metric::dump_metrics; -use once_cell::sync::OnceCell; pub use panic_hook::set_panic_hook; -use parking_lot::Mutex; -use rand::random; -use snowflake::SnowflakeIdBucket; -pub use {common_error, tracing, tracing_appender, tracing_futures, tracing_subscriber}; - -static NODE_ID: OnceCell = OnceCell::new(); -static TRACE_BUCKET: OnceCell> = OnceCell::new(); - -pub fn gen_trace_id() -> u64 { - let mut bucket = TRACE_BUCKET - .get_or_init(|| { - // if node_id is not initialized, how about random one? - let node_id = NODE_ID.get_or_init(|| 0); - info!("initializing bucket with node_id: {}", node_id); - let bucket = SnowflakeIdBucket::new(1, (*node_id) as i32); - Mutex::new(bucket) - }) - .lock(); - (*bucket).get_id() as u64 -} - -pub fn init_node_id(node_id: Option) { - let node_id = node_id.map(|id| calculate_hash(&id)).unwrap_or(random()); - match NODE_ID.set(node_id) { - Ok(_) => {} - Err(_) => warn!("node_id is already initialized"), - } -} - -fn calculate_hash(t: &T) -> u64 { - let mut s = DefaultHasher::new(); - t.hash(&mut s); - s.finish() -} +pub use {common_error, tracing}; diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index 2a6b5e3bc0d9..9ff9306973ad 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -17,10 +17,11 @@ use std::env; use std::sync::{Arc, Mutex, Once}; use once_cell::sync::Lazy; -use opentelemetry::global; -use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry::{global, KeyValue}; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_semantic_conventions::resource; use serde::{Deserialize, Serialize}; -pub use tracing::{event, span, Level}; use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_log::LogTracer; @@ -29,38 +30,17 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::prelude::*; use tracing_subscriber::{filter, EnvFilter, Registry}; -pub use crate::{debug, error, info, log, trace, warn}; +pub use crate::{debug, error, info, trace, warn}; -tokio::task_local! { - /// Task local trace id. See [trace_id](crate::trace_id) for more details. - pub static TRACE_ID: u64; -} - -/// Get current [TRACE_ID] from tokio [task_local](tokio::task_local) storage. -/// -/// # Usage -/// To set current trace id, wrap your async code like this: -/// ```rust, no_run -/// common_telemetry::TRACE_ID -/// .scope(id, async move { -/// query_handler -/// .do_query(query, self.session.context()) -/// .await -/// }) -/// .await -/// ``` -/// Then all functions called from this stack will be able to retrieve the trace id -/// via this method. -pub fn trace_id() -> Option { - TRACE_ID.try_with(|id| Some(*id)).unwrap_or(None) -} +const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317"; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] pub struct LoggingOptions { pub dir: String, pub level: Option, - pub enable_jaeger_tracing: bool, + pub enable_otlp_tracing: bool, + pub otlp_endpoint: Option, } impl Default for LoggingOptions { @@ -68,7 +48,8 @@ impl Default for LoggingOptions { Self { dir: "/tmp/greptimedb/logs".to_string(), level: None, - enable_jaeger_tracing: false, + enable_otlp_tracing: false, + otlp_endpoint: None, } } } @@ -106,9 +87,10 @@ pub fn init_default_ut_logging() { "unittest", &opts, TracingOptions::default(), + None )); - info!("logs dir = {}", dir); + crate::info!("logs dir = {}", dir); }); } @@ -122,11 +104,12 @@ pub fn init_global_logging( app_name: &str, opts: &LoggingOptions, tracing_opts: TracingOptions, + node_id: Option, ) -> Vec { let mut guards = vec![]; let dir = &opts.dir; let level = &opts.level; - let enable_jaeger_tracing = opts.enable_jaeger_tracing; + let enable_otlp_tracing = opts.enable_otlp_tracing; // Enable log compatible layer to convert log record to tracing span. LogTracer::init().expect("log tracer must be valid"); @@ -204,15 +187,34 @@ pub fn init_global_logging( .with(file_logging_layer) .with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR)); - if enable_jaeger_tracing { - // Jaeger layer. + if enable_otlp_tracing { global::set_text_map_propagator(TraceContextPropagator::new()); - let tracer = opentelemetry_jaeger::new_pipeline() - .with_service_name(app_name) - .install_batch(opentelemetry::runtime::Tokio) - .expect("install"); - let jaeger_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer)); - let subscriber = subscriber.with(jaeger_layer); + // otlp exporter + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter().tonic().with_endpoint( + opts.otlp_endpoint + .as_ref() + .map(|e| format!("http://{}", e)) + .unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()), + ), + ) + .with_trace_config(opentelemetry_sdk::trace::config().with_resource( + opentelemetry_sdk::Resource::new(vec![ + KeyValue::new(resource::SERVICE_NAME, app_name.to_string()), + KeyValue::new( + resource::SERVICE_INSTANCE_ID, + node_id.unwrap_or("none".to_string()), + ), + KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()), + ]), + )) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .expect("otlp tracer install failed"); + let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer)); + let subscriber = subscriber.with(tracing_layer); tracing::subscriber::set_global_default(subscriber) .expect("error setting global tracing subscriber"); } else { diff --git a/src/common/telemetry/src/macros.rs b/src/common/telemetry/src/macros.rs index c4fd285b3356..9ca91354a5df 100644 --- a/src/common/telemetry/src/macros.rs +++ b/src/common/telemetry/src/macros.rs @@ -17,14 +17,12 @@ macro_rules! log { // log!(target: "my_target", Level::INFO, "a {} event", "log"); (target: $target:expr, $lvl:expr, $($arg:tt)+) => {{ - let _trace_id = $crate::trace_id(); - $crate::logging::event!(target: $target, $lvl, trace_id = _trace_id, $($arg)+) + $crate::tracing::event!(target: $target, $lvl, $($arg)+) }}; // log!(Level::INFO, "a log event") ($lvl:expr, $($arg:tt)+) => {{ - let _trace_id = $crate::trace_id(); - $crate::logging::event!($lvl, trace_id = _trace_id, $($arg)+) + $crate::tracing::event!($lvl, $($arg)+) }}; } @@ -33,14 +31,14 @@ macro_rules! log { macro_rules! error { // error!(target: "my_target", "a {} event", "log") (target: $target:expr, $($arg:tt)+) => ({ - $crate::log!(target: $target, $crate::logging::Level::ERROR, $($arg)+) + $crate::log!(target: $target, $crate::tracing::Level::ERROR, $($arg)+) }); // error!(e; target: "my_target", "a {} event", "log") ($e:expr; target: $target:expr, $($arg:tt)+) => ({ $crate::log!( target: $target, - $crate::logging::Level::ERROR, + $crate::tracing::Level::ERROR, err = ?$e, $($arg)+ ) @@ -50,7 +48,7 @@ macro_rules! error { (%$e:expr; target: $target:expr, $($arg:tt)+) => ({ $crate::log!( target: $target, - $crate::logging::Level::ERROR, + $crate::tracing::Level::ERROR, err = %$e, $($arg)+ ) @@ -59,7 +57,7 @@ macro_rules! error { // error!(e; "a {} event", "log") ($e:expr; $($arg:tt)+) => ({ $crate::log!( - $crate::logging::Level::ERROR, + $crate::tracing::Level::ERROR, err = ?$e, $($arg)+ ) @@ -68,7 +66,7 @@ macro_rules! error { // error!(%e; "a {} event", "log") (%$e:expr; $($arg:tt)+) => ({ $crate::log!( - $crate::logging::Level::ERROR, + $crate::tracing::Level::ERROR, err = %$e, $($arg)+ ) @@ -76,7 +74,7 @@ macro_rules! error { // error!("a {} event", "log") ($($arg:tt)+) => ({ - $crate::log!($crate::logging::Level::ERROR, $($arg)+) + $crate::log!($crate::tracing::Level::ERROR, $($arg)+) }); } @@ -85,13 +83,13 @@ macro_rules! error { macro_rules! warn { // warn!(target: "my_target", "a {} event", "log") (target: $target:expr, $($arg:tt)+) => { - $crate::log!(target: $target, $crate::logging::Level::WARN, $($arg)+) + $crate::log!(target: $target, $crate::tracing::Level::WARN, $($arg)+) }; // warn!(e; "a {} event", "log") ($e:expr; $($arg:tt)+) => ({ $crate::log!( - $crate::logging::Level::WARN, + $crate::tracing::Level::WARN, err = ?$e, $($arg)+ ) @@ -100,7 +98,7 @@ macro_rules! warn { // warn!(%e; "a {} event", "log") (%$e:expr; $($arg:tt)+) => ({ $crate::log!( - $crate::logging::Level::WARN, + $crate::tracing::Level::WARN, err = %$e, $($arg)+ ) @@ -108,7 +106,7 @@ macro_rules! warn { // warn!("a {} event", "log") ($($arg:tt)+) => { - $crate::log!($crate::logging::Level::WARN, $($arg)+) + $crate::log!($crate::tracing::Level::WARN, $($arg)+) }; } @@ -117,12 +115,12 @@ macro_rules! warn { macro_rules! info { // info!(target: "my_target", "a {} event", "log") (target: $target:expr, $($arg:tt)+) => { - $crate::log!(target: $target, $crate::logging::Level::INFO, $($arg)+) + $crate::log!(target: $target, $crate::tracing::Level::INFO, $($arg)+) }; // info!("a {} event", "log") ($($arg:tt)+) => { - $crate::log!($crate::logging::Level::INFO, $($arg)+) + $crate::log!($crate::tracing::Level::INFO, $($arg)+) }; } @@ -131,12 +129,12 @@ macro_rules! info { macro_rules! debug { // debug!(target: "my_target", "a {} event", "log") (target: $target:expr, $($arg:tt)+) => { - $crate::log!(target: $target, $crate::logging::Level::DEBUG, $($arg)+) + $crate::log!(target: $target, $crate::tracing::Level::DEBUG, $($arg)+) }; // debug!("a {} event", "log") ($($arg:tt)+) => { - $crate::log!($crate::logging::Level::DEBUG, $($arg)+) + $crate::log!($crate::tracing::Level::DEBUG, $($arg)+) }; } @@ -145,12 +143,12 @@ macro_rules! debug { macro_rules! trace { // trace!(target: "my_target", "a {} event", "log") (target: $target:expr, $($arg:tt)+) => { - $crate::log!(target: $target, $crate::logging::Level::TRACE, $($arg)+) + $crate::log!(target: $target, $crate::tracing::Level::TRACE, $($arg)+) }; // trace!("a {} event", "log") ($($arg:tt)+) => { - $crate::log!($crate::logging::Level::TRACE, $($arg)+) + $crate::log!($crate::tracing::Level::TRACE, $($arg)+) }; } @@ -158,8 +156,7 @@ macro_rules! trace { mod tests { use common_error::mock::MockError; use common_error::status_code::StatusCode; - - use crate::logging::Level; + use tracing::Level; macro_rules! all_log_macros { ($($arg:tt)*) => { diff --git a/src/common/telemetry/src/tracing_context.rs b/src/common/telemetry/src/tracing_context.rs new file mode 100644 index 000000000000..b6aed81c858e --- /dev/null +++ b/src/common/telemetry/src/tracing_context.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! tracing stuffs, inspired by RisingWave +use std::collections::HashMap; + +use opentelemetry::propagation::TextMapPropagator; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +// An wapper for `Futures` that provides tracing instrument adapters. +pub trait FutureExt: std::future::Future + Sized { + fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented; +} + +impl FutureExt for T { + #[inline] + fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented { + tracing::instrument::Instrument::instrument(self, span) + } +} + +/// Context for tracing used for propagating tracing information in a distributed system. +/// +/// Generally, the caller of a service should create a tracing context from the current tracing span +/// and pass it to the callee through the network. The callee will then attach its local tracing +/// span as a child of the tracing context, so that the external tracing service can associate them +/// in a single trace. +/// +/// The tracing context must be serialized into the W3C trace context format and passed in rpc +/// message headers when communication of frontend, datanode and meta. +/// +/// See [Trace Context](https://www.w3.org/TR/trace-context/) for more information. +#[derive(Debug, Clone)] +pub struct TracingContext(opentelemetry::Context); + +pub type W3cTrace = HashMap; + +impl Default for TracingContext { + fn default() -> Self { + Self::new() + } +} + +type Propagator = TraceContextPropagator; + +impl TracingContext { + /// Create a new tracing context from a tracing span. + pub fn from_span(span: &tracing::Span) -> Self { + Self(span.context()) + } + + /// Create a new tracing context from the current tracing span considered by the subscriber. + pub fn from_current_span() -> Self { + Self::from_span(&tracing::Span::current()) + } + + /// Create a no-op tracing context. + pub fn new() -> Self { + Self(opentelemetry::Context::new()) + } + + /// Attach the given span as a child of the context. Returns the attached span. + pub fn attach(&self, span: tracing::Span) -> tracing::Span { + span.set_parent(self.0.clone()); + span + } + + /// Convert the tracing context to the W3C trace context format. + pub fn to_w3c(&self) -> W3cTrace { + let mut fields = HashMap::new(); + Propagator::new().inject_context(&self.0, &mut fields); + fields + } + + /// Create a new tracing context from the W3C trace context format. + pub fn from_w3c(fields: &W3cTrace) -> Self { + let context = Propagator::new().extract(fields); + Self(context) + } +} diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index bcd9ac20cf0b..7e2925af6f74 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -28,6 +28,8 @@ use common_query::physical_plan::DfPhysicalPlanAdapter; use common_query::{DfPhysicalPlan, Output}; use common_recordbatch::SendableRecordBatchStream; use common_runtime::Runtime; +use common_telemetry::tracing::{self, info_span}; +use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{info, warn}; use dashmap::DashMap; use datafusion::catalog::schema::SchemaProvider; @@ -99,6 +101,7 @@ impl RegionServer { self.inner.handle_request(region_id, request).await } + #[tracing::instrument(skip_all)] pub async fn handle_read(&self, request: QueryRequest) -> Result { self.inner.handle_read(request).await } @@ -154,9 +157,19 @@ impl RegionServerHandler for RegionServer { .context(BuildRegionRequestsSnafu) .map_err(BoxedError::new) .context(ExecuteGrpcRequestSnafu)?; + let tracing_context = TracingContext::from_current_span(); let join_tasks = requests.into_iter().map(|(region_id, req)| { let self_to_move = self.clone(); - async move { self_to_move.handle_request(region_id, req).await } + let span = tracing_context.attach(info_span!( + "RegionServer::handle_region_request", + region_id = region_id.to_string() + )); + async move { + self_to_move + .handle_request(region_id, req) + .trace(span) + .await + } }); let results = try_join_all(join_tasks) @@ -198,15 +211,18 @@ impl FlightCraft for RegionServer { let ticket = request.into_inner().ticket; let request = QueryRequest::decode(ticket.as_ref()) .context(servers_error::InvalidFlightTicketSnafu)?; - let trace_id = request + let tracing_context = request .header .as_ref() - .map(|h| h.trace_id) + .map(|h| TracingContext::from_w3c(&h.tracing_context)) .unwrap_or_default(); - let result = self.handle_read(request).await?; + let result = self + .handle_read(request) + .trace(tracing_context.attach(info_span!("RegionServer::handle_read"))) + .await?; - let stream = Box::pin(FlightRecordBatchStream::new(result, trace_id)); + let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context)); Ok(Response::new(stream)) } } @@ -283,6 +299,10 @@ impl RegionServerInner { let result = engine .handle_request(region_id, request) + .trace(info_span!( + "RegionEngine::handle_region_request", + engine_type + )) .await .with_context(|_| HandleRegionRequestSnafu { region_id })?; diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 2fbdbfc02233..8f487dc782a9 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -207,8 +207,6 @@ impl Instance { Arc::new(handlers_executor), )); - common_telemetry::init_node_id(opts.node_id.clone()); - Ok(Instance { catalog_manager, script_executor, diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index f7d121e5f4a6..29ce275b2cb8 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -27,6 +27,8 @@ use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::{Sequence, SequenceRef}; use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::tracing; +use common_telemetry::tracing_context::{FutureExt, TracingContext}; use datanode::region_server::RegionServer; use servers::grpc::region_server::RegionServerHandler; use snafu::{OptionExt, ResultExt}; @@ -71,8 +73,15 @@ impl RegionInvoker { #[async_trait] impl Datanode for RegionInvoker { async fn handle(&self, request: RegionRequest) -> MetaResult { + let span = request + .header + .as_ref() + .map(|h| TracingContext::from_w3c(&h.tracing_context)) + .unwrap_or_default() + .attach(tracing::info_span!("RegionInvoker::handle_region_request")); let response = self .handle_inner(request) + .trace(span) .await .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; @@ -83,8 +92,15 @@ impl Datanode for RegionInvoker { } async fn handle_query(&self, request: QueryRequest) -> MetaResult { + let span = request + .header + .as_ref() + .map(|h| TracingContext::from_w3c(&h.tracing_context)) + .unwrap_or_default() + .attach(tracing::info_span!("RegionInvoker::handle_query")); self.region_server .handle_read(request) + .trace(span) .await .map_err(BoxedError::new) .context(meta_error::ExternalSnafu) diff --git a/src/meta-client/src/client/ask_leader.rs b/src/meta-client/src/client/ask_leader.rs index 40c731f2cd90..425d2a0634fb 100644 --- a/src/meta-client/src/client/ask_leader.rs +++ b/src/meta-client/src/client/ask_leader.rs @@ -19,6 +19,7 @@ use api::v1::meta::heartbeat_client::HeartbeatClient; use api::v1::meta::{AskLeaderRequest, RequestHeader, Role}; use common_grpc::channel_manager::ChannelManager; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; +use common_telemetry::tracing_context::TracingContext; use common_telemetry::warn; use rand::seq::SliceRandom; use snafu::{OptionExt, ResultExt}; @@ -77,7 +78,11 @@ impl AskLeader { peers.shuffle(&mut rand::thread_rng()); let req = AskLeaderRequest { - header: Some(RequestHeader::new(self.id, self.role)), + header: Some(RequestHeader::new( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + )), }; let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len()); diff --git a/src/meta-client/src/client/ddl.rs b/src/meta-client/src/client/ddl.rs index 7150d1808dfa..d36080b1b4c9 100644 --- a/src/meta-client/src/client/ddl.rs +++ b/src/meta-client/src/client/ddl.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use api::v1::meta::ddl_task_client::DdlTaskClient; use api::v1::meta::{ErrorCode, ResponseHeader, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_grpc::channel_manager::ChannelManager; +use common_telemetry::tracing_context::TracingContext; use common_telemetry::{info, warn}; use snafu::{ensure, ResultExt}; use tokio::sync::RwLock; @@ -133,7 +134,11 @@ impl Inner { } ); - req.set_header(self.id, self.role); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); let ask_leader = self.ask_leader.as_ref().unwrap(); let mut times = 0; diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index 2c36fb24b67b..8b873e48da1b 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -19,6 +19,7 @@ use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role}; use common_grpc::channel_manager::ChannelManager; use common_meta::rpc::util; use common_telemetry::info; +use common_telemetry::tracing_context::TracingContext; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::{mpsc, RwLock}; use tokio_stream::wrappers::ReceiverStream; @@ -49,7 +50,11 @@ impl HeartbeatSender { #[inline] pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> { - req.set_header(self.id, self.role); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); self.sender.send(req).await.map_err(|e| { error::SendHeartbeatSnafu { err_msg: e.to_string(), @@ -207,7 +212,11 @@ impl Inner { let (sender, receiver) = mpsc::channel::(128); - let header = RequestHeader::new(self.id, self.role); + let header = RequestHeader::new( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); let handshake = HeartbeatRequest { header: Some(header), ..Default::default() diff --git a/src/meta-client/src/client/lock.rs b/src/meta-client/src/client/lock.rs index 8f0912e56752..04c4aaa34c9c 100644 --- a/src/meta-client/src/client/lock.rs +++ b/src/meta-client/src/client/lock.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use api::v1::meta::lock_client::LockClient; use api::v1::meta::{LockRequest, LockResponse, Role, UnlockRequest, UnlockResponse}; use common_grpc::channel_manager::ChannelManager; +use common_telemetry::tracing_context::TracingContext; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::RwLock; use tonic::transport::Channel; @@ -127,7 +128,11 @@ impl Inner { async fn lock(&self, mut req: LockRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id, self.role); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); let res = client.lock(req).await.map_err(error::Error::from)?; Ok(res.into_inner()) @@ -135,7 +140,11 @@ impl Inner { async fn unlock(&self, mut req: UnlockRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id, self.role); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); let res = client.unlock(req).await.map_err(error::Error::from)?; Ok(res.into_inner()) diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index 53219ae464ec..a03c88fbf1bb 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -22,6 +22,7 @@ use api::v1::meta::{ DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, Role, }; use common_grpc::channel_manager::ChannelManager; +use common_telemetry::tracing_context::TracingContext; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::RwLock; use tonic::transport::Channel; @@ -134,7 +135,11 @@ impl Inner { async fn range(&self, mut req: RangeRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id, self.role); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); let res = client.range(req).await.map_err(error::Error::from)?; Ok(res.into_inner()) @@ -142,7 +147,11 @@ impl Inner { async fn put(&self, mut req: PutRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id, self.role); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); let res = client.put(req).await.map_err(error::Error::from)?; Ok(res.into_inner()) @@ -150,7 +159,11 @@ impl Inner { async fn batch_get(&self, mut req: BatchGetRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id, self.role); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); let res = client.batch_get(req).await.map_err(error::Error::from)?; @@ -159,7 +172,11 @@ impl Inner { async fn batch_put(&self, mut req: BatchPutRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id, self.role); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); let res = client.batch_put(req).await.map_err(error::Error::from)?; Ok(res.into_inner()) @@ -167,7 +184,11 @@ impl Inner { async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id, self.role); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); let res = client.batch_delete(req).await.map_err(error::Error::from)?; Ok(res.into_inner()) @@ -178,7 +199,11 @@ impl Inner { mut req: CompareAndPutRequest, ) -> Result { let mut client = self.random_client()?; - req.set_header(self.id, self.role); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); let res = client .compare_and_put(req) .await @@ -189,7 +214,11 @@ impl Inner { async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id, self.role); + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); let res = client.delete_range(req).await.map_err(error::Error::from)?; Ok(res.into_inner()) diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index a945c50ef8e9..ce1731411e43 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -53,6 +53,7 @@ mod tests { use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::sequence::Sequence; + use common_telemetry::tracing_context::W3cTrace; use super::*; use crate::cluster::MetaPeerClientBuilder; @@ -89,7 +90,7 @@ mod tests { }; let req = HeartbeatRequest { - header: Some(RequestHeader::new((1, 2), Role::Datanode)), + header: Some(RequestHeader::new((1, 2), Role::Datanode, W3cTrace::new())), ..Default::default() }; let mut acc = HeartbeatAccumulator::default(); diff --git a/src/meta-srv/src/service/ddl.rs b/src/meta-srv/src/service/ddl.rs index 9f82f8ec4d58..44f81032b9f9 100644 --- a/src/meta-srv/src/service/ddl.rs +++ b/src/meta-srv/src/service/ddl.rs @@ -45,6 +45,7 @@ impl ddl_task_server::DdlTask for MetaSrv { .submit_ddl_task( &ExecutorContext { cluster_id: Some(cluster_id), + tracing_context: Some(header.tracing_context), }, SubmitDdlTaskRequest { task }, ) diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index f250d1b7e9d2..2e5d82ac37f1 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -168,6 +168,7 @@ mod tests { use api::v1::meta::heartbeat_server::Heartbeat; use api::v1::meta::*; use common_meta::kv_backend::memory::MemoryKvBackend; + use common_telemetry::tracing_context::W3cTrace; use tonic::IntoRequest; use super::get_node_id; @@ -184,7 +185,7 @@ mod tests { .unwrap(); let req = AskLeaderRequest { - header: Some(RequestHeader::new((1, 1), Role::Datanode)), + header: Some(RequestHeader::new((1, 1), Role::Datanode, W3cTrace::new())), }; let res = meta_srv.ask_leader(req.into_request()).await.unwrap(); diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index 55a0c555a0bb..dfee0bc58e8d 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -257,6 +257,7 @@ mod tests { use api::v1::meta::store_server::Store; use api::v1::meta::*; use common_meta::kv_backend::memory::MemoryKvBackend; + use common_telemetry::tracing_context::W3cTrace; use tonic::IntoRequest; use crate::metasrv::builder::MetaSrvBuilder; @@ -275,7 +276,7 @@ mod tests { let meta_srv = new_meta_srv().await; let mut req = RangeRequest::default(); - req.set_header((1, 1), Role::Datanode); + req.set_header((1, 1), Role::Datanode, W3cTrace::new()); let res = meta_srv.range(req.into_request()).await; let _ = res.unwrap(); @@ -286,7 +287,7 @@ mod tests { let meta_srv = new_meta_srv().await; let mut req = PutRequest::default(); - req.set_header((1, 1), Role::Datanode); + req.set_header((1, 1), Role::Datanode, W3cTrace::new()); let res = meta_srv.put(req.into_request()).await; let _ = res.unwrap(); @@ -297,7 +298,7 @@ mod tests { let meta_srv = new_meta_srv().await; let mut req = BatchGetRequest::default(); - req.set_header((1, 1), Role::Datanode); + req.set_header((1, 1), Role::Datanode, W3cTrace::new()); let res = meta_srv.batch_get(req.into_request()).await; let _ = res.unwrap(); @@ -308,7 +309,7 @@ mod tests { let meta_srv = new_meta_srv().await; let mut req = BatchPutRequest::default(); - req.set_header((1, 1), Role::Datanode); + req.set_header((1, 1), Role::Datanode, W3cTrace::new()); let res = meta_srv.batch_put(req.into_request()).await; let _ = res.unwrap(); @@ -319,7 +320,7 @@ mod tests { let meta_srv = new_meta_srv().await; let mut req = BatchDeleteRequest::default(); - req.set_header((1, 1), Role::Datanode); + req.set_header((1, 1), Role::Datanode, W3cTrace::new()); let res = meta_srv.batch_delete(req.into_request()).await; let _ = res.unwrap(); @@ -330,7 +331,7 @@ mod tests { let meta_srv = new_meta_srv().await; let mut req = CompareAndPutRequest::default(); - req.set_header((1, 1), Role::Datanode); + req.set_header((1, 1), Role::Datanode, W3cTrace::new()); let res = meta_srv.compare_and_put(req.into_request()).await; let _ = res.unwrap(); @@ -341,7 +342,7 @@ mod tests { let meta_srv = new_meta_srv().await; let mut req = DeleteRangeRequest::default(); - req.set_header((1, 1), Role::Datanode); + req.set_header((1, 1), Role::Datanode, W3cTrace::new()); let res = meta_srv.delete_range(req.into_request()).await; let _ = res.unwrap(); diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 50bc0f8133c6..023daca2810b 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -17,8 +17,7 @@ use std::time::Duration; use common_query::Output; -use common_telemetry::info; -use common_telemetry::tracing::warn; +use common_telemetry::{info, warn}; use futures::TryStreamExt; use object_store::util::join_path; use object_store::{EntryMode, ObjectStore}; diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 849760bf53dc..4b4feb10b0ea 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -22,6 +22,7 @@ use catalog::CatalogManagerRef; use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; use common_meta::peer::Peer; use common_query::Output; +use common_telemetry::tracing_context::TracingContext; use futures_util::future; use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; @@ -119,8 +120,10 @@ impl Deleter { requests: RegionDeleteRequests, ctx: &QueryContextRef, ) -> Result { - let header: RegionRequestHeader = ctx.as_ref().into(); - let request_factory = RegionRequestFactory::new(header); + let request_factory = RegionRequestFactory::new(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + dbname: ctx.get_db_string(), + }); let tasks = self .group_requests_by_peer(requests) diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 078727997e97..a17839ed64ae 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -26,6 +26,7 @@ use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; use common_meta::peer::Peer; use common_query::Output; +use common_telemetry::tracing_context::TracingContext; use common_telemetry::{error, info}; use datatypes::schema::Schema; use futures_util::future; @@ -152,8 +153,10 @@ impl Inserter { ctx: &QueryContextRef, ) -> Result { write_meter!(ctx.current_catalog(), ctx.current_schema(), requests); - let header: RegionRequestHeader = ctx.as_ref().into(); - let request_factory = RegionRequestFactory::new(header); + let request_factory = RegionRequestFactory::new(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + dbname: ctx.get_db_string(), + }); let tasks = self .group_requests_by_peer(requests) diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index adac0a0f40f1..ca31e7a18ac0 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -32,6 +32,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::table_name::TableName; use common_query::Output; +use common_telemetry::tracing; use common_time::range::TimestampRange; use common_time::Timestamp; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; @@ -88,6 +89,7 @@ impl StatementExecutor { } } + #[tracing::instrument(skip_all)] pub async fn execute_stmt( &self, stmt: QueryStatement, @@ -201,6 +203,7 @@ impl StatementExecutor { .context(PlanStatementSnafu) } + #[tracing::instrument(skip_all)] async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result { let plan = self.plan(stmt, query_ctx.clone()).await?; self.query_engine diff --git a/src/operator/src/statement/backup.rs b/src/operator/src/statement/backup.rs index 7d34d376f59c..d970e28bd740 100644 --- a/src/operator/src/statement/backup.rs +++ b/src/operator/src/statement/backup.rs @@ -14,7 +14,7 @@ use common_datasource::file_format::Format; use common_query::Output; -use common_telemetry::info; +use common_telemetry::{info, tracing}; use session::context::QueryContextBuilder; use snafu::{ensure, ResultExt}; use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; @@ -27,6 +27,7 @@ pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time"; pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time"; impl StatementExecutor { + #[tracing::instrument(skip_all)] pub(crate) async fn copy_database(&self, req: CopyDatabaseRequest) -> error::Result { // location must end with / so that every table is exported to a file. ensure!( diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 9f34627ee4eb..2474ee3483de 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -29,7 +29,7 @@ use common_datasource::object_store::{build_backend, parse_url}; use common_datasource::util::find_dir_and_filename; use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter; use common_recordbatch::DfSendableRecordBatchStream; -use common_telemetry::debug; +use common_telemetry::{debug, tracing}; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream}; @@ -237,6 +237,7 @@ impl StatementExecutor { } } + #[tracing::instrument(skip_all)] pub async fn copy_table_from( &self, req: CopyTableRequest, diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index ca6830d5ecdb..f70dada86c20 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -23,7 +23,7 @@ use common_datasource::util::find_dir_and_filename; use common_query::Output; use common_recordbatch::adapter::DfRecordBatchStreamAdapter; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::debug; +use common_telemetry::{debug, tracing}; use datafusion::datasource::DefaultTableSource; use datafusion_common::TableReference as DfTableReference; use datafusion_expr::LogicalPlanBuilder; @@ -84,6 +84,7 @@ impl StatementExecutor { } } + #[tracing::instrument(skip_all)] pub(crate) async fn copy_table_to( &self, req: CopyTableRequest, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 1932e956fc82..c31ee9afae94 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -28,7 +28,7 @@ use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse use common_meta::rpc::router::{Partition, Partition as MetaPartition}; use common_meta::table_name::TableName; use common_query::Output; -use common_telemetry::info; +use common_telemetry::{info, tracing}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::RawSchema; use partition::partition::{PartitionBound, PartitionDef}; @@ -58,11 +58,13 @@ impl StatementExecutor { self.catalog_manager.clone() } + #[tracing::instrument(skip_all)] pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result { let create_expr = &mut expr_factory::create_to_expr(&stmt, ctx)?; self.create_table_inner(create_expr, stmt.partitions).await } + #[tracing::instrument(skip_all)] pub async fn create_external_table( &self, create_expr: CreateExternalTable, @@ -151,6 +153,7 @@ impl StatementExecutor { Ok(table) } + #[tracing::instrument(skip_all)] pub async fn drop_table(&self, table_name: TableName) -> Result { let table = self .catalog_manager @@ -181,6 +184,7 @@ impl StatementExecutor { Ok(Output::AffectedRows(0)) } + #[tracing::instrument(skip_all)] pub async fn truncate_table(&self, table_name: TableName) -> Result { let table = self .catalog_manager @@ -221,6 +225,7 @@ impl StatementExecutor { Ok(()) } + #[tracing::instrument(skip_all)] pub async fn alter_table( &self, alter_table: AlterTable, @@ -347,6 +352,7 @@ impl StatementExecutor { .context(error::ExecuteDdlSnafu) } + #[tracing::instrument(skip_all)] pub async fn create_database( &self, catalog: &str, diff --git a/src/operator/src/statement/describe.rs b/src/operator/src/statement/describe.rs index 452388b701b3..7a28aa2d8388 100644 --- a/src/operator/src/statement/describe.rs +++ b/src/operator/src/statement/describe.rs @@ -14,6 +14,7 @@ use common_error::ext::BoxedError; use common_query::Output; +use common_telemetry::tracing; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use sql::statements::describe::DescribeTable; @@ -26,6 +27,7 @@ use crate::statement::StatementExecutor; use crate::table::table_idents_to_full_name; impl StatementExecutor { + #[tracing::instrument(skip_all)] pub(super) async fn describe_table( &self, stmt: DescribeTable, diff --git a/src/operator/src/statement/dml.rs b/src/operator/src/statement/dml.rs index 197ac5c03b11..69ada96c803b 100644 --- a/src/operator/src/statement/dml.rs +++ b/src/operator/src/statement/dml.rs @@ -13,6 +13,7 @@ // limitations under the License. use common_query::Output; +use common_telemetry::tracing; use query::parser::QueryStatement; use session::context::QueryContextRef; use sql::statements::insert::Insert; @@ -22,6 +23,7 @@ use super::StatementExecutor; use crate::error::Result; impl StatementExecutor { + #[tracing::instrument(skip_all)] pub async fn insert(&self, insert: Box, query_ctx: QueryContextRef) -> Result { if insert.can_extract_values() { // Fast path: plain insert ("insert with literal values") is executed directly diff --git a/src/operator/src/statement/show.rs b/src/operator/src/statement/show.rs index 0d6fa2288055..3841d721c1e0 100644 --- a/src/operator/src/statement/show.rs +++ b/src/operator/src/statement/show.rs @@ -14,6 +14,7 @@ use common_meta::table_name::TableName; use common_query::Output; +use common_telemetry::tracing; use partition::manager::PartitionInfo; use partition::partition::PartitionBound; use session::context::QueryContextRef; @@ -28,6 +29,7 @@ use crate::error::{self, ExecuteStatementSnafu, Result}; use crate::statement::StatementExecutor; impl StatementExecutor { + #[tracing::instrument(skip_all)] pub(super) async fn show_databases( &self, stmt: ShowDatabases, @@ -38,6 +40,7 @@ impl StatementExecutor { .context(ExecuteStatementSnafu) } + #[tracing::instrument(skip_all)] pub(super) async fn show_tables( &self, stmt: ShowTables, @@ -48,6 +51,7 @@ impl StatementExecutor { .context(ExecuteStatementSnafu) } + #[tracing::instrument(skip_all)] pub async fn show_create_table( &self, table_name: TableName, diff --git a/src/operator/src/statement/tql.rs b/src/operator/src/statement/tql.rs index 9b7f72c821db..6050f48f5efe 100644 --- a/src/operator/src/statement/tql.rs +++ b/src/operator/src/statement/tql.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use common_query::Output; +use common_telemetry::tracing; use query::parser::{PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, EXPLAIN_NODE_NAME}; use session::context::QueryContextRef; use snafu::ResultExt; @@ -24,6 +25,7 @@ use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Re use crate::statement::StatementExecutor; impl StatementExecutor { + #[tracing::instrument(skip_all)] pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result { let stmt = match tql { Tql::Eval(eval) => { diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index caa72f22a137..b0b7c55ebfb1 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -34,6 +34,7 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::{ EmptyRecordBatchStream, RecordBatch, RecordBatches, SendableRecordBatchStream, }; +use common_telemetry::tracing; use datafusion::common::Column; use datafusion::physical_plan::analyze::AnalyzeExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -77,6 +78,7 @@ impl DatafusionQueryEngine { Self { state, plugins } } + #[tracing::instrument(skip_all)] async fn exec_query_plan( &self, plan: LogicalPlan, @@ -97,6 +99,7 @@ impl DatafusionQueryEngine { Ok(Output::Stream(self.execute_stream(&ctx, &physical_plan)?)) } + #[tracing::instrument(skip_all)] async fn exec_dml_statement( &self, dml: DmlStatement, @@ -147,6 +150,7 @@ impl DatafusionQueryEngine { Ok(Output::AffectedRows(affected_rows)) } + #[tracing::instrument(skip_all)] async fn delete<'a>( &self, table_name: &ResolvedTableReference<'a>, @@ -189,6 +193,7 @@ impl DatafusionQueryEngine { .await } + #[tracing::instrument(skip_all)] async fn insert<'a>( &self, table_name: &ResolvedTableReference<'a>, @@ -285,6 +290,7 @@ impl QueryEngine for DatafusionQueryEngine { } impl LogicalOptimizer for DatafusionQueryEngine { + #[tracing::instrument(skip_all)] fn optimize(&self, plan: &LogicalPlan) -> Result { let _timer = metrics::METRIC_OPTIMIZE_LOGICAL_ELAPSED.start_timer(); match plan { @@ -305,6 +311,7 @@ impl LogicalOptimizer for DatafusionQueryEngine { #[async_trait::async_trait] impl PhysicalPlanner for DatafusionQueryEngine { + #[tracing::instrument(skip_all)] async fn create_physical_plan( &self, ctx: &mut QueryEngineContext, @@ -338,6 +345,7 @@ impl PhysicalPlanner for DatafusionQueryEngine { } impl PhysicalOptimizer for DatafusionQueryEngine { + #[tracing::instrument(skip_all)] fn optimize_physical_plan( &self, ctx: &mut QueryEngineContext, @@ -385,6 +393,7 @@ impl PhysicalOptimizer for DatafusionQueryEngine { } impl QueryExecutor for DatafusionQueryEngine { + #[tracing::instrument(skip_all)] fn execute_stream( &self, ctx: &QueryEngineContext, diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 91a7cb444177..845e7b813d6c 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -27,7 +27,8 @@ use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{ DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream, }; -use common_telemetry::trace_id; +use common_telemetry::tracing; +use common_telemetry::tracing_context::TracingContext; use datafusion::physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, }; @@ -155,6 +156,7 @@ impl MergeScanExec { }) } + #[tracing::instrument(skip_all)] pub fn to_stream(&self, context: Arc) -> Result { let substrait_plan = self.substrait_plan.to_vec(); let regions = self.regions.clone(); @@ -163,7 +165,8 @@ impl MergeScanExec { let schema = Self::arrow_schema_to_schema(self.schema())?; let dbname = context.task_id().unwrap_or_default(); - let trace_id = trace_id().unwrap_or_default(); + + let tracing_context = TracingContext::from_current_span().to_w3c(); let stream = Box::pin(stream!({ METRIC_MERGE_SCAN_REGIONS.observe(regions.len() as f64); @@ -174,8 +177,7 @@ impl MergeScanExec { for region_id in regions { let request = QueryRequest { header: Some(RegionRequestHeader { - trace_id, - span_id: 0, + tracing_context: tracing_context.clone(), dbname: dbname.clone(), }), region_id: region_id.into(), diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 20749446ec96..9e5c1a3f9dab 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use catalog::table_source::DfTableSourceProvider; use common_error::ext::BoxedError; +use common_telemetry::tracing; use datafusion::execution::context::SessionState; use datafusion_sql::planner::{ParserOptions, SqlToRel}; use promql::planner::PromPlanner; @@ -51,6 +52,7 @@ impl DfLogicalPlanner { } } + #[tracing::instrument(skip_all)] async fn plan_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { let df_stmt = (&stmt).try_into().context(SqlSnafu)?; @@ -85,6 +87,7 @@ impl DfLogicalPlanner { Ok(LogicalPlan::DfPlan(plan)) } + #[tracing::instrument(skip_all)] async fn plan_pql(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result { let table_provider = DfTableSourceProvider::new( self.engine_state.catalog_manager().clone(), @@ -101,6 +104,7 @@ impl DfLogicalPlanner { #[async_trait] impl LogicalPlanner for DfLogicalPlanner { + #[tracing::instrument(skip_all)] async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result { match stmt { QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await, diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index cea21a8a00c6..dd576ee6c725 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -26,6 +26,7 @@ use arrow_flight::{ use async_trait::async_trait; use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_query::Output; +use common_telemetry::tracing_context::TracingContext; use futures::Stream; use prost::Message; use snafu::ResultExt; @@ -150,28 +151,26 @@ impl FlightCraft for GreptimeRequestHandler { let ticket = request.into_inner().ticket; let request = GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?; - let trace_id = request - .header - .as_ref() - .map(|h| h.trace_id) - .unwrap_or_default(); let output = self.handle_request(request).await?; let stream: Pin> + Send + Sync>> = - to_flight_data_stream(output, trace_id); + to_flight_data_stream(output, TracingContext::new()); Ok(Response::new(stream)) } } -fn to_flight_data_stream(output: Output, trace_id: u64) -> TonicStream { +fn to_flight_data_stream( + output: Output, + tracing_context: TracingContext, +) -> TonicStream { match output { Output::Stream(stream) => { - let stream = FlightRecordBatchStream::new(stream, trace_id); + let stream = FlightRecordBatchStream::new(stream, tracing_context); Box::pin(stream) as _ } Output::RecordBatches(x) => { - let stream = FlightRecordBatchStream::new(x.as_stream(), trace_id); + let stream = FlightRecordBatchStream::new(x.as_stream(), tracing_context); Box::pin(stream) as _ } Output::AffectedRows(rows) => { diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index 542b031df887..3024f5437aba 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -18,7 +18,9 @@ use std::task::{Context, Poll}; use arrow_flight::FlightData; use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::{warn, TRACE_ID}; +use common_telemetry::tracing::info_span; +use common_telemetry::tracing_context::{FutureExt, TracingContext}; +use common_telemetry::warn; use futures::channel::mpsc; use futures::channel::mpsc::Sender; use futures::{SinkExt, Stream, StreamExt}; @@ -39,11 +41,13 @@ pub struct FlightRecordBatchStream { } impl FlightRecordBatchStream { - pub fn new(recordbatches: SendableRecordBatchStream, trace_id: u64) -> Self { + pub fn new(recordbatches: SendableRecordBatchStream, tracing_context: TracingContext) -> Self { let (tx, rx) = mpsc::channel::>(1); - let join_handle = common_runtime::spawn_read(TRACE_ID.scope(trace_id, async move { - Self::flight_data_stream(recordbatches, tx).await - })); + let join_handle = common_runtime::spawn_read(async move { + Self::flight_data_stream(recordbatches, tx) + .trace(tracing_context.attach(info_span!("flight_data_stream"))) + .await + }); Self { rx, join_handle, @@ -145,7 +149,7 @@ mod test { let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]) .unwrap() .as_stream(); - let mut stream = FlightRecordBatchStream::new(recordbatches, 0); + let mut stream = FlightRecordBatchStream::new(recordbatches, TracingContext::default()); let mut raw_data = Vec::with_capacity(2); raw_data.push(stream.next().await.unwrap().unwrap()); diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index d56875906e71..36c518996494 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -27,7 +27,7 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; use common_runtime::Runtime; -use common_telemetry::{logging, TRACE_ID}; +use common_telemetry::logging; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; @@ -70,7 +70,6 @@ impl GreptimeRequestHandler { let request_type = request_type(&query).to_string(); let db = query_ctx.get_db_string(); let timer = RequestTimer::new(db.clone(), request_type); - let trace_id = query_ctx.trace_id(); // Executes requests in another runtime to // 1. prevent the execution from being cancelled unexpected by Tonic runtime; @@ -79,7 +78,7 @@ impl GreptimeRequestHandler { // - Obtaining a `JoinHandle` to get the panic message (if there's any). // From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped. // 2. avoid the handler blocks the gRPC runtime incidentally. - let handle = self.runtime.spawn(TRACE_ID.scope(trace_id, async move { + let handle = self.runtime.spawn(async move { handler.do_query(query, query_ctx).await.map_err(|e| { if e.status_code().should_log_error() { logging::error!(e; "Failed to handle request"); @@ -89,7 +88,7 @@ impl GreptimeRequestHandler { } e }) - })); + }); handle.await.context(JoinTaskSnafu).map_err(|e| { timer.record(e.status_code()); @@ -166,7 +165,6 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte QueryContextBuilder::default() .current_catalog(catalog.to_string()) .current_schema(schema.to_string()) - .try_trace_id(header.map(|h| h.trace_id)) .build() } diff --git a/src/servers/src/grpc/region_server.rs b/src/servers/src/grpc/region_server.rs index c8210ebbab97..30c6eb97cbe0 100644 --- a/src/servers/src/grpc/region_server.rs +++ b/src/servers/src/grpc/region_server.rs @@ -19,7 +19,9 @@ use api::v1::region::{region_request, RegionRequest, RegionResponse}; use async_trait::async_trait; use common_error::ext::ErrorExt; use common_runtime::Runtime; -use common_telemetry::{debug, error, TRACE_ID}; +use common_telemetry::tracing::info_span; +use common_telemetry::tracing_context::{FutureExt, TracingContext}; +use common_telemetry::{debug, error}; use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response}; @@ -45,12 +47,14 @@ impl RegionServerRequestHandler { } async fn handle(&self, request: RegionRequest) -> Result { - let trace_id = request - .header - .context(InvalidQuerySnafu { - reason: "Expecting non-empty region request header.", - })? - .trace_id; + let tracing_context = TracingContext::from_w3c( + &request + .header + .context(InvalidQuerySnafu { + reason: "Expecting non-empty region request header.", + })? + .tracing_context, + ); let query = request.body.context(InvalidQuerySnafu { reason: "Expecting non-empty region request body.", })?; @@ -64,17 +68,21 @@ impl RegionServerRequestHandler { // - Obtaining a `JoinHandle` to get the panic message (if there's any). // From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped. // 2. avoid the handler blocks the gRPC runtime incidentally. - let handle = self.runtime.spawn(TRACE_ID.scope(trace_id, async move { - handler.handle(query).await.map_err(|e| { - if e.status_code().should_log_error() { - error!(e; "Failed to handle request"); - } else { - // Currently, we still print a debug log. - debug!("Failed to handle request, err: {:?}", e); - } - e - }) - })); + let handle = self.runtime.spawn(async move { + handler + .handle(query) + .trace(tracing_context.attach(info_span!("RegionServerRequestHandler::handle"))) + .await + .map_err(|e| { + if e.status_code().should_log_error() { + error!(e; "Failed to handle request"); + } else { + // Currently, we still print a debug log. + debug!("Failed to handle request, err: {}", e); + } + e + }) + }); handle.await.context(JoinTaskSnafu)? } diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index a42c1d92f540..80a2964ca7c5 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -24,7 +24,7 @@ use chrono::{NaiveDate, NaiveDateTime}; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_query::Output; -use common_telemetry::{error, logging, warn}; +use common_telemetry::{error, logging, tracing, warn}; use datatypes::prelude::ConcreteDataType; use opensrv_mysql::{ AsyncMysqlShim, Column, ErrorKind, InitWriter, ParamParser, ParamValue, QueryResultWriter, @@ -91,18 +91,14 @@ impl MysqlInstanceShim { } } + #[tracing::instrument(skip_all)] async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { if let Some(output) = crate::mysql::federated::check(query, query_ctx.clone(), self.session.clone()) { vec![Ok(output)] } else { - let trace_id = query_ctx.trace_id(); - common_telemetry::TRACE_ID - .scope(trace_id, async move { - self.query_handler.do_query(query, query_ctx).await - }) - .await + self.query_handler.do_query(query, query_ctx).await } } diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 189fe180f319..bfb8a6036aba 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -37,8 +37,6 @@ pub struct QueryContext { current_user: ArcSwap>, time_zone: Option, sql_dialect: Box, - trace_id: u64, - span_id: u64, } impl Display for QueryContext { @@ -61,18 +59,6 @@ impl From<&RegionRequestHeader> for QueryContext { current_user: Default::default(), time_zone: Default::default(), sql_dialect: Box::new(GreptimeDbDialect {}), - trace_id: value.trace_id, - span_id: value.span_id, - } - } -} - -impl From<&QueryContext> for RegionRequestHeader { - fn from(value: &QueryContext) -> Self { - RegionRequestHeader { - trace_id: value.trace_id, - span_id: value.span_id, - dbname: value.get_db_string(), } } } @@ -142,16 +128,6 @@ impl QueryContext { pub fn set_current_user(&self, user: Option) { let _ = self.current_user.swap(Arc::new(user)); } - - #[inline] - pub fn trace_id(&self) -> u64 { - self.trace_id - } - - #[inline] - pub fn span_id(&self) -> u64 { - self.span_id - } } impl QueryContextBuilder { @@ -170,15 +146,8 @@ impl QueryContextBuilder { sql_dialect: self .sql_dialect .unwrap_or_else(|| Box::new(GreptimeDbDialect {})), - trace_id: self.trace_id.unwrap_or_else(common_telemetry::gen_trace_id), - span_id: self.span_id.unwrap_or_default(), }) } - - pub fn try_trace_id(mut self, trace_id: Option) -> Self { - self.trace_id = trace_id; - self - } } #[derive(Debug)] diff --git a/src/storage/src/compaction.rs b/src/storage/src/compaction.rs index d63af5cf958d..38fe1d986991 100644 --- a/src/storage/src/compaction.rs +++ b/src/storage/src/compaction.rs @@ -21,7 +21,7 @@ mod writer; use std::sync::Arc; -use common_telemetry::tracing::log::warn; +use common_telemetry::warn; use common_time::timestamp::TimeUnit; use common_time::Timestamp; pub use picker::{LeveledTimeWindowPicker, Picker, PickerContext}; diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 52ac32c83844..c810cc99c846 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -17,8 +17,7 @@ use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; use std::time::Duration; -use common_telemetry::tracing::log::warn; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, warn}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; diff --git a/src/storage/src/compaction/twcs.rs b/src/storage/src/compaction/twcs.rs index 17b7096aba44..f263cdecaf87 100644 --- a/src/storage/src/compaction/twcs.rs +++ b/src/storage/src/compaction/twcs.rs @@ -18,8 +18,7 @@ use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; -use common_telemetry::tracing::warn; -use common_telemetry::{debug, info}; +use common_telemetry::{debug, info, warn}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f6c5fb7e737c..c8f7ed871756 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -677,7 +677,7 @@ enable = true enable = true [frontend.logging] -enable_jaeger_tracing = false +enable_otlp_tracing = false [frontend.datanode.client] timeout = "10s" @@ -750,10 +750,10 @@ sst_write_buffer_size = "8MiB" [datanode.region_engine.file] [datanode.logging] -enable_jaeger_tracing = false +enable_otlp_tracing = false [logging] -enable_jaeger_tracing = false"#, +enable_otlp_tracing = false"#, store_type, num_cpus::get() / 2 );