Skip to content

Commit

Permalink
feat: pass QueryContext to FlowRequestHeader (#3878)
Browse files Browse the repository at this point in the history
* feat: pass `QueryContext` to `DdlTaskRequest`

* feat: pass `QueryContext` to `FlowRequestHeader`

* chore: fmt toml
  • Loading branch information
WenyXu authored May 9, 2024
1 parent a6a702d commit 0e05f85
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 68 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "07de9a47ed46eae0fab4cbc5c32a17c67ed5cb38" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "47af36e2b218bdb09fa3a84a31999245152db2ee" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ rskafka.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
session.workspace = true
snafu.workspace = true
store-api.workspace = true
strum.workspace = true
Expand Down
18 changes: 15 additions & 3 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ mod metadata;
use std::collections::BTreeMap;

use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{CreateRequest, FlowRequest};
use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
Expand All @@ -40,7 +41,7 @@ use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::FlowId;
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateFlowTask;
use crate::rpc::ddl::{CreateFlowTask, QueryContext};
use crate::{metrics, ClusterId};

/// The procedure of flow creation.
Expand All @@ -53,7 +54,12 @@ impl CreateFlowProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow";

/// Returns a new [CreateFlowProcedure].
pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self {
pub fn new(
cluster_id: ClusterId,
task: CreateFlowTask,
query_context: QueryContext,
context: DdlContext,
) -> Self {
Self {
context,
data: CreateFlowData {
Expand All @@ -63,6 +69,7 @@ impl CreateFlowProcedure {
peers: vec![],
source_table_ids: vec![],
state: CreateFlowState::CreateMetadata,
query_context,
},
}
}
Expand All @@ -88,6 +95,10 @@ impl CreateFlowProcedure {
for peer in &self.data.peers {
let requester = self.context.node_manager.flownode(peer).await;
let request = FlowRequest {
header: Some(FlowRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
query_context: Some(self.data.query_context.clone().into()),
}),
body: Some(PbFlowRequest::Create((&self.data).into())),
};
create_flow.push(async move {
Expand Down Expand Up @@ -187,6 +198,7 @@ pub struct CreateFlowData {
pub(crate) flow_id: Option<FlowId>,
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
pub(crate) query_context: QueryContext,
}

impl From<&CreateFlowData> for CreateRequest {
Expand Down
16 changes: 12 additions & 4 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::rpc::ddl::DdlTask::{
};
use crate::rpc::ddl::{
AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, DropDatabaseTask,
DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
DropTableTask, QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::procedure;
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
Expand Down Expand Up @@ -328,9 +328,10 @@ impl DdlManager {
&self,
cluster_id: ClusterId,
create_flow: CreateFlowTask,
query_context: QueryContext,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = CreateFlowProcedure::new(cluster_id, create_flow, context);
let procedure = CreateFlowProcedure::new(cluster_id, create_flow, query_context, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

self.submit_procedure(procedure_with_id).await
Expand Down Expand Up @@ -600,9 +601,10 @@ async fn handle_create_flow_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
create_flow_task: CreateFlowTask,
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse> {
let (id, output) = ddl_manager
.submit_create_flow_task(cluster_id, create_flow_task.clone())
.submit_create_flow_task(cluster_id, create_flow_task.clone(), query_context)
.await?;

let procedure_id = id.to_string();
Expand Down Expand Up @@ -705,7 +707,13 @@ impl ProcedureExecutor for DdlManager {
handle_drop_database_task(self, cluster_id, drop_database_task).await
}
CreateFlow(create_flow_task) => {
handle_create_flow_task(self, cluster_id, create_flow_task).await
handle_create_flow_task(
self,
cluster_id,
create_flow_task,
request.query_context.into(),
)
.await
}
}
}
Expand Down
42 changes: 41 additions & 1 deletion src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ use api::v1::meta::{
};
use api::v1::{
AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, DropDatabaseExpr, DropFlowExpr,
DropTableExpr, TruncateTableExpr,
DropTableExpr, QueryContext as PbQueryContext, TruncateTableExpr,
};
use base64::engine::general_purpose;
use base64::Engine as _;
use prost::Message;
use serde::{Deserialize, Serialize};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
Expand Down Expand Up @@ -195,6 +196,7 @@ impl TryFrom<Task> for DdlTask {

#[derive(Clone)]
pub struct SubmitDdlTaskRequest {
pub query_context: QueryContextRef,
pub task: DdlTask,
}

Expand Down Expand Up @@ -238,6 +240,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {

Ok(Self {
header: None,
query_context: Some((*request.query_context).clone().into()),
task: Some(task),
})
}
Expand Down Expand Up @@ -853,6 +856,43 @@ impl From<DropFlowTask> for PbDropFlowTask {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryContext {
current_catalog: String,
current_schema: String,
timezone: String,
extensions: HashMap<String, String>,
}

impl From<QueryContextRef> for QueryContext {
fn from(query_context: QueryContextRef) -> Self {
QueryContext {
current_catalog: query_context.current_catalog().to_string(),
current_schema: query_context.current_schema().to_string(),
timezone: query_context.timezone().to_string(),
extensions: query_context.extensions(),
}
}
}

impl From<QueryContext> for PbQueryContext {
fn from(
QueryContext {
current_catalog,
current_schema,
timezone,
extensions,
}: QueryContext,
) -> Self {
PbQueryContext {
current_catalog,
current_schema,
timezone,
extensions,
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
16 changes: 11 additions & 5 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,31 +111,37 @@ impl GrpcQueryHandler for Instance {
DdlExpr::CreateTable(mut expr) => {
let _ = self
.statement_executor
.create_table_inner(&mut expr, None, &ctx)
.create_table_inner(&mut expr, None, ctx.clone())
.await?;
Output::new_with_affected_rows(0)
}
DdlExpr::Alter(expr) => self.statement_executor.alter_table_inner(expr).await?,
DdlExpr::Alter(expr) => {
self.statement_executor
.alter_table_inner(expr, ctx.clone())
.await?
}
DdlExpr::CreateDatabase(expr) => {
self.statement_executor
.create_database(
ctx.current_catalog(),
&expr.schema_name,
expr.create_if_not_exists,
ctx.clone(),
)
.await?
}
DdlExpr::DropTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.statement_executor
.drop_table(table_name, expr.drop_if_exists)
.drop_table(table_name, expr.drop_if_exists, ctx.clone())
.await?
}
DdlExpr::TruncateTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.statement_executor.truncate_table(table_name).await?
self.statement_executor
.truncate_table(table_name, ctx.clone())
.await?
}
DdlExpr::CreateFlow(_) => {
unimplemented!()
Expand Down
18 changes: 16 additions & 2 deletions src/meta-srv/src/service/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::{
Expand Down Expand Up @@ -56,10 +57,20 @@ impl procedure_service_server::ProcedureService for Metasrv {
}

async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
let PbDdlTaskRequest { header, task, .. } = request.into_inner();
let PbDdlTaskRequest {
header,
query_context,
task,
..
} = request.into_inner();

let header = header.context(error::MissingRequestHeaderSnafu)?;
let cluster_id = header.cluster_id;
let query_context = query_context
.context(error::MissingRequiredParameterSnafu {
param: "query_context",
})?
.into();
let task: DdlTask = task
.context(error::MissingRequiredParameterSnafu { param: "task" })?
.try_into()
Expand All @@ -72,7 +83,10 @@ impl procedure_service_server::ProcedureService for Metasrv {
cluster_id: Some(cluster_id),
tracing_context: Some(header.tracing_context),
},
SubmitDdlTaskRequest { task },
SubmitDdlTaskRequest {
query_context: Arc::new(query_context),
task,
},
)
.await
.context(error::SubmitDdlTaskSnafu)?
Expand Down
Loading

0 comments on commit 0e05f85

Please sign in to comment.