From 067917845f881fde6257693d67c891f8d1232c89 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Sun, 8 Oct 2023 14:30:23 +0800 Subject: [PATCH] fix: carry `dbname` from frontend to datanode (#2520) * chore: add dbname in region request header for tracking purpose * chore: fix handle read * chore: add write meter * chore: add meter-core to dep * chore: add converter between RegionRequestHeader and QueryContext & update proto version --- Cargo.lock | 3 ++- Cargo.toml | 2 +- src/datanode/src/region_server.rs | 11 ++++++--- src/operator/Cargo.toml | 6 ++--- src/operator/src/delete.rs | 9 ++++--- src/operator/src/insert.rs | 13 +++++----- src/query/src/dist_plan/merge_scan.rs | 14 ++++++++--- src/query/src/query_engine/context.rs | 4 ++-- src/session/Cargo.toml | 1 + src/session/src/context.rs | 34 ++++++++++++++++++++++++++- 10 files changed, 71 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a94bd04889cd..ce662bce8b68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4175,7 +4175,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=693128abe9adc70ba636010a172c9da55b206bba#693128abe9adc70ba636010a172c9da55b206bba" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1f1dd532a111e3834cc3019c5605e2993ffb9dc3#1f1dd532a111e3834cc3019c5605e2993ffb9dc3" dependencies = [ "prost", "serde", @@ -8899,6 +8899,7 @@ dependencies = [ name = "session" version = "0.4.0-nightly" dependencies = [ + "api", "arc-swap", "auth", "common-catalog", diff --git a/Cargo.toml b/Cargo.toml index 6fdd77e2056e..7e454a62f97b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "693128abe9adc70ba636010a172c9da55b206bba" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1f1dd532a111e3834cc3019c5605e2993ffb9dc3" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 1fe9fa562faa..ef489499bfac 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -44,7 +44,7 @@ use query::QueryEngineRef; use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult}; use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::region_server::RegionServerHandler; -use session::context::QueryContext; +use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngineRef; @@ -285,12 +285,17 @@ impl RegionServerInner { // TODO(ruihang): add metrics and set trace id let QueryRequest { - header: _, + header, region_id, plan, } = request; let region_id = RegionId::from_u64(region_id); + let ctx: QueryContextRef = header + .as_ref() + .map(|h| Arc::new(h.into())) + .unwrap_or_else(|| QueryContextBuilder::default().build()); + // build dummy catalog list let engine = self .region_map @@ -306,7 +311,7 @@ impl RegionServerInner { .context(DecodeLogicalPlanSnafu)?; let result = self .query_engine - .execute(logical_plan.into(), QueryContext::arc()) + .execute(logical_plan.into(), ctx) .await .context(ExecuteLogicalPlanSnafu)?; diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index 2de2f326745b..26e48c3b4187 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -35,6 +35,8 @@ file-engine = { workspace = true } futures = "0.3" futures-util.workspace = true meta-client = { workspace = true } +meter-core.workspace = true +meter-macros.workspace = true metrics.workspace = true object-store = { workspace = true } partition = { workspace = true } @@ -53,7 +55,3 @@ substrait = { workspace = true } table = { workspace = true } tokio.workspace = true tonic.workspace = true - -[dev-dependencies] -meter-core.workspace = true -meter-macros.workspace = true diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 50111dd39113..1efd757ee452 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -90,7 +90,7 @@ impl Deleter { .convert(requests) .await?; - let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?; + let affected_rows = self.do_request(deletes, &ctx).await?; Ok(Output::AffectedRows(affected_rows as _)) } @@ -109,7 +109,7 @@ impl Deleter { .convert(request) .await?; - let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?; + let affected_rows = self.do_request(deletes, &ctx).await?; Ok(affected_rows as _) } } @@ -118,10 +118,9 @@ impl Deleter { async fn do_request( &self, requests: RegionDeleteRequests, - trace_id: u64, - span_id: u64, + ctx: &QueryContextRef, ) -> Result { - let header = RegionRequestHeader { trace_id, span_id }; + let header: RegionRequestHeader = ctx.as_ref().into(); let request_factory = RegionRequestFactory::new(header); let tasks = self diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index bd4af4aafb50..856baa962776 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -29,6 +29,7 @@ use common_query::Output; use common_telemetry::{error, info}; use datatypes::schema::Schema; use futures_util::future; +use meter_macros::write_meter; use metrics::counter; use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; @@ -104,7 +105,7 @@ impl Inserter { .convert(requests) .await?; - let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?; + let affected_rows = self.do_request(inserts, &ctx).await?; Ok(Output::AffectedRows(affected_rows as _)) } @@ -126,7 +127,7 @@ impl Inserter { .convert(request) .await?; - let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?; + let affected_rows = self.do_request(inserts, &ctx).await?; Ok(affected_rows as _) } @@ -140,7 +141,7 @@ impl Inserter { .convert(insert) .await?; - let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?; + let affected_rows = self.do_request(inserts, ctx).await?; Ok(Output::AffectedRows(affected_rows as _)) } } @@ -149,10 +150,10 @@ impl Inserter { async fn do_request( &self, requests: RegionInsertRequests, - trace_id: u64, - span_id: u64, + ctx: &QueryContextRef, ) -> Result { - let header = RegionRequestHeader { trace_id, span_id }; + write_meter!(ctx.current_catalog(), ctx.current_schema(), requests); + let header: RegionRequestHeader = ctx.as_ref().into(); let request_factory = RegionRequestFactory::new(header); let tasks = self diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index cf7afa26dae7..ce07b9e5dde2 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -26,6 +26,7 @@ use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{ DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream, }; +use common_telemetry::trace_id; use datafusion::physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, }; @@ -35,7 +36,7 @@ use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; -use greptime_proto::v1::region::QueryRequest; +use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader}; use snafu::ResultExt; use store_api::storage::RegionId; @@ -149,13 +150,16 @@ impl MergeScanExec { }) } - pub fn to_stream(&self, _context: Arc) -> Result { + pub fn to_stream(&self, context: Arc) -> Result { let substrait_plan = self.substrait_plan.to_vec(); let regions = self.regions.clone(); let region_query_handler = self.region_query_handler.clone(); let metric = MergeScanMetric::new(&self.metric); let schema = Self::arrow_schema_to_schema(self.schema())?; + let dbname = context.task_id().unwrap_or_default(); + let trace_id = trace_id().unwrap_or_default(); + let stream = Box::pin(stream!({ let _finish_timer = metric.finish_time().timer(); let mut ready_timer = metric.ready_time().timer(); @@ -163,7 +167,11 @@ impl MergeScanExec { for region_id in regions { let request = QueryRequest { - header: None, + header: Some(RegionRequestHeader { + trace_id, + span_id: 0, + dbname: dbname.clone(), + }), region_id: region_id.into(), plan: substrait_plan.clone(), }; diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index 79c6ec7f069f..29eb2171c20e 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -39,10 +39,10 @@ impl QueryEngineContext { } pub fn build_task_ctx(&self) -> Arc { - let task_id = self.query_ctx.trace_id().to_string(); + let dbname = self.query_ctx.get_db_string(); let state = &self.state; Arc::new(TaskContext::new( - Some(task_id), + Some(dbname), state.session_id().to_string(), state.config().clone(), state.scalar_functions().clone(), diff --git a/src/session/Cargo.toml b/src/session/Cargo.toml index 623234c3ff3f..0f7af500856a 100644 --- a/src/session/Cargo.toml +++ b/src/session/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true testing = [] [dependencies] +api.workspace = true arc-swap = "1.5" auth.workspace = true common-catalog = { workspace = true } diff --git a/src/session/src/context.rs b/src/session/src/context.rs index bfdac8f7725f..4edb639f5795 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -11,11 +11,11 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - use std::fmt::{Display, Formatter}; use std::net::SocketAddr; use std::sync::Arc; +use api::v1::region::RegionRequestHeader; use arc_swap::ArcSwap; use auth::UserInfoRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -37,6 +37,7 @@ pub struct QueryContext { time_zone: Option, sql_dialect: Box, trace_id: u64, + span_id: u64, } impl Display for QueryContext { @@ -50,6 +51,31 @@ impl Display for QueryContext { } } +impl From<&RegionRequestHeader> for QueryContext { + fn from(value: &RegionRequestHeader) -> Self { + let (catalog, schema) = parse_catalog_and_schema_from_db_string(&value.dbname); + QueryContext { + current_catalog: catalog.to_string(), + current_schema: schema.to_string(), + current_user: Default::default(), + time_zone: Default::default(), + sql_dialect: Box::new(GreptimeDbDialect {}), + trace_id: value.trace_id, + span_id: value.span_id, + } + } +} + +impl From<&QueryContext> for RegionRequestHeader { + fn from(value: &QueryContext) -> Self { + RegionRequestHeader { + trace_id: value.trace_id, + span_id: value.span_id, + dbname: value.get_db_string(), + } + } +} + impl QueryContext { pub fn arc() -> QueryContextRef { QueryContextBuilder::default().build() @@ -120,6 +146,11 @@ impl QueryContext { pub fn trace_id(&self) -> u64 { self.trace_id } + + #[inline] + pub fn span_id(&self) -> u64 { + self.span_id + } } impl QueryContextBuilder { @@ -139,6 +170,7 @@ impl QueryContextBuilder { .sql_dialect .unwrap_or_else(|| Box::new(GreptimeDbDialect {})), trace_id: self.trace_id.unwrap_or_else(common_telemetry::gen_trace_id), + span_id: self.span_id.unwrap_or_default(), }) }