Skip to content

Commit

Permalink
fix: carry dbname from frontend to datanode (#2520)
Browse files Browse the repository at this point in the history
* chore: add dbname in region request header for tracking purpose

* chore: fix handle read

* chore: add write meter

* chore: add meter-core to dep

* chore: add converter between RegionRequestHeader and QueryContext & update proto version
  • Loading branch information
shuiyisong authored Oct 8, 2023
1 parent a680133 commit 0679178
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 26 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "693128abe9adc70ba636010a172c9da55b206bba" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1f1dd532a111e3834cc3019c5605e2993ffb9dc3" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
11 changes: 8 additions & 3 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use query::QueryEngineRef;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
use session::context::QueryContext;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
Expand Down Expand Up @@ -285,12 +285,17 @@ impl RegionServerInner {
// TODO(ruihang): add metrics and set trace id

let QueryRequest {
header: _,
header,
region_id,
plan,
} = request;
let region_id = RegionId::from_u64(region_id);

let ctx: QueryContextRef = header
.as_ref()
.map(|h| Arc::new(h.into()))
.unwrap_or_else(|| QueryContextBuilder::default().build());

// build dummy catalog list
let engine = self
.region_map
Expand All @@ -306,7 +311,7 @@ impl RegionServerInner {
.context(DecodeLogicalPlanSnafu)?;
let result = self
.query_engine
.execute(logical_plan.into(), QueryContext::arc())
.execute(logical_plan.into(), ctx)
.await
.context(ExecuteLogicalPlanSnafu)?;

Expand Down
6 changes: 2 additions & 4 deletions src/operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ file-engine = { workspace = true }
futures = "0.3"
futures-util.workspace = true
meta-client = { workspace = true }
meter-core.workspace = true
meter-macros.workspace = true
metrics.workspace = true
object-store = { workspace = true }
partition = { workspace = true }
Expand All @@ -53,7 +55,3 @@ substrait = { workspace = true }
table = { workspace = true }
tokio.workspace = true
tonic.workspace = true

[dev-dependencies]
meter-core.workspace = true
meter-macros.workspace = true
9 changes: 4 additions & 5 deletions src/operator/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Deleter {
.convert(requests)
.await?;

let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?;
let affected_rows = self.do_request(deletes, &ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))
}

Expand All @@ -109,7 +109,7 @@ impl Deleter {
.convert(request)
.await?;

let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?;
let affected_rows = self.do_request(deletes, &ctx).await?;
Ok(affected_rows as _)
}
}
Expand All @@ -118,10 +118,9 @@ impl Deleter {
async fn do_request(
&self,
requests: RegionDeleteRequests,
trace_id: u64,
span_id: u64,
ctx: &QueryContextRef,
) -> Result<AffectedRows> {
let header = RegionRequestHeader { trace_id, span_id };
let header: RegionRequestHeader = ctx.as_ref().into();
let request_factory = RegionRequestFactory::new(header);

let tasks = self
Expand Down
13 changes: 7 additions & 6 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use common_query::Output;
use common_telemetry::{error, info};
use datatypes::schema::Schema;
use futures_util::future;
use meter_macros::write_meter;
use metrics::counter;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -104,7 +105,7 @@ impl Inserter {
.convert(requests)
.await?;

let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?;
let affected_rows = self.do_request(inserts, &ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))
}

Expand All @@ -126,7 +127,7 @@ impl Inserter {
.convert(request)
.await?;

let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?;
let affected_rows = self.do_request(inserts, &ctx).await?;
Ok(affected_rows as _)
}

Expand All @@ -140,7 +141,7 @@ impl Inserter {
.convert(insert)
.await?;

let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?;
let affected_rows = self.do_request(inserts, ctx).await?;
Ok(Output::AffectedRows(affected_rows as _))
}
}
Expand All @@ -149,10 +150,10 @@ impl Inserter {
async fn do_request(
&self,
requests: RegionInsertRequests,
trace_id: u64,
span_id: u64,
ctx: &QueryContextRef,
) -> Result<AffectedRows> {
let header = RegionRequestHeader { trace_id, span_id };
write_meter!(ctx.current_catalog(), ctx.current_schema(), requests);
let header: RegionRequestHeader = ctx.as_ref().into();
let request_factory = RegionRequestFactory::new(header);

let tasks = self
Expand Down
14 changes: 11 additions & 3 deletions src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream,
};
use common_telemetry::trace_id;
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
};
Expand All @@ -35,7 +36,7 @@ use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::PhysicalSortExpr;
use datatypes::schema::{Schema, SchemaRef};
use futures_util::StreamExt;
use greptime_proto::v1::region::QueryRequest;
use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader};
use snafu::ResultExt;
use store_api::storage::RegionId;

Expand Down Expand Up @@ -149,21 +150,28 @@ impl MergeScanExec {
})
}

pub fn to_stream(&self, _context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
pub fn to_stream(&self, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
let substrait_plan = self.substrait_plan.to_vec();
let regions = self.regions.clone();
let region_query_handler = self.region_query_handler.clone();
let metric = MergeScanMetric::new(&self.metric);
let schema = Self::arrow_schema_to_schema(self.schema())?;

let dbname = context.task_id().unwrap_or_default();
let trace_id = trace_id().unwrap_or_default();

let stream = Box::pin(stream!({
let _finish_timer = metric.finish_time().timer();
let mut ready_timer = metric.ready_time().timer();
let mut first_consume_timer = Some(metric.first_consume_time().timer());

for region_id in regions {
let request = QueryRequest {
header: None,
header: Some(RegionRequestHeader {
trace_id,
span_id: 0,
dbname: dbname.clone(),
}),
region_id: region_id.into(),
plan: substrait_plan.clone(),
};
Expand Down
4 changes: 2 additions & 2 deletions src/query/src/query_engine/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ impl QueryEngineContext {
}

pub fn build_task_ctx(&self) -> Arc<TaskContext> {
let task_id = self.query_ctx.trace_id().to_string();
let dbname = self.query_ctx.get_db_string();
let state = &self.state;
Arc::new(TaskContext::new(
Some(task_id),
Some(dbname),
state.session_id().to_string(),
state.config().clone(),
state.scalar_functions().clone(),
Expand Down
1 change: 1 addition & 0 deletions src/session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license.workspace = true
testing = []

[dependencies]
api.workspace = true
arc-swap = "1.5"
auth.workspace = true
common-catalog = { workspace = true }
Expand Down
34 changes: 33 additions & 1 deletion src/session/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Display, Formatter};
use std::net::SocketAddr;
use std::sync::Arc;

use api::v1::region::RegionRequestHeader;
use arc_swap::ArcSwap;
use auth::UserInfoRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
Expand All @@ -37,6 +37,7 @@ pub struct QueryContext {
time_zone: Option<TimeZone>,
sql_dialect: Box<dyn Dialect + Send + Sync>,
trace_id: u64,
span_id: u64,
}

impl Display for QueryContext {
Expand All @@ -50,6 +51,31 @@ impl Display for QueryContext {
}
}

impl From<&RegionRequestHeader> for QueryContext {
fn from(value: &RegionRequestHeader) -> Self {
let (catalog, schema) = parse_catalog_and_schema_from_db_string(&value.dbname);
QueryContext {
current_catalog: catalog.to_string(),
current_schema: schema.to_string(),
current_user: Default::default(),
time_zone: Default::default(),
sql_dialect: Box::new(GreptimeDbDialect {}),
trace_id: value.trace_id,
span_id: value.span_id,
}
}
}

impl From<&QueryContext> for RegionRequestHeader {
fn from(value: &QueryContext) -> Self {
RegionRequestHeader {
trace_id: value.trace_id,
span_id: value.span_id,
dbname: value.get_db_string(),
}
}
}

impl QueryContext {
pub fn arc() -> QueryContextRef {
QueryContextBuilder::default().build()
Expand Down Expand Up @@ -120,6 +146,11 @@ impl QueryContext {
pub fn trace_id(&self) -> u64 {
self.trace_id
}

#[inline]
pub fn span_id(&self) -> u64 {
self.span_id
}
}

impl QueryContextBuilder {
Expand All @@ -139,6 +170,7 @@ impl QueryContextBuilder {
.sql_dialect
.unwrap_or_else(|| Box::new(GreptimeDbDialect {})),
trace_id: self.trace_id.unwrap_or_else(common_telemetry::gen_trace_id),
span_id: self.span_id.unwrap_or_default(),
})
}

Expand Down

0 comments on commit 0679178

Please sign in to comment.