diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index a79ad0041b4f3..9e409f057807a 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -361,7 +361,3 @@ message PlanFragment { PlanNode root = 1; ExchangeInfo exchange_info = 2; } - -message CapturedContext { - optional string time_zone = 1; -} diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index fd7988413aeed..b57414d57dfba 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -878,3 +878,8 @@ message StreamFragmentGraph { // If none, default parallelism will be applied. Parallelism parallelism = 6; } + +// Provide statement-local context, e.g. session info like time zone, for runtime execution. +message CapturedContext { + string time_zone = 1; +} diff --git a/proto/task_service.proto b/proto/task_service.proto index b598993181488..af736ddcd76ad 100644 --- a/proto/task_service.proto +++ b/proto/task_service.proto @@ -40,7 +40,7 @@ message CreateTaskRequest { batch_plan.PlanFragment plan = 2; common.BatchQueryEpoch epoch = 3; map tracing_context = 4; - batch_plan.CapturedContext captured_context = 5; + stream_plan.CapturedContext captured_context = 5; } message CancelTaskRequest { @@ -64,7 +64,7 @@ message ExecuteRequest { batch_plan.PlanFragment plan = 2; common.BatchQueryEpoch epoch = 3; map tracing_context = 4; - batch_plan.CapturedContext captured_context = 5; + stream_plan.CapturedContext captured_context = 5; } service TaskService { diff --git a/src/batch/src/execution/grpc_exchange.rs b/src/batch/src/execution/grpc_exchange.rs index e0233a643970e..57905ab3991b4 100644 --- a/src/batch/src/execution/grpc_exchange.rs +++ b/src/batch/src/execution/grpc_exchange.rs @@ -51,7 +51,7 @@ impl GrpcExchangeSource { plan: plan.plan, epoch: plan.epoch, tracing_context: plan.tracing_context, - captured_context: Some(capture_context()), + captured_context: Some(capture_context()?), }; client.execute(execute_request).await? } diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 93298eeca21ae..e75ec1f3e803a 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -28,8 +28,9 @@ use risingwave_common::util::panic::FutureCatchUnwindExt; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_common::util::tracing::TracingContext; use risingwave_expr::captured_context_scope; -use risingwave_pb::batch_plan::{CapturedContext, PbTaskId, PbTaskOutputId, PlanFragment}; +use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment}; use risingwave_pb::common::BatchQueryEpoch; +use risingwave_pb::stream_plan::CapturedContext; use risingwave_pb::task_service::task_info_response::TaskStatus; use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse}; use risingwave_pb::PbFieldNotFound; diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index 8f77b03c1b95d..5adff41d38c44 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -23,8 +23,9 @@ use risingwave_common::config::BatchConfig; use risingwave_common::memory::MemoryContext; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_common::util::tracing::TracingContext; -use risingwave_pb::batch_plan::{CapturedContext, PbTaskId, PbTaskOutputId, PlanFragment}; +use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment}; use risingwave_pb::common::BatchQueryEpoch; +use risingwave_pb::stream_plan::CapturedContext; use risingwave_pb::task_service::task_info_response::TaskStatus; use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse}; use tokio::sync::mpsc::Sender; diff --git a/src/expr/core/src/captured_context.rs b/src/expr/core/src/captured_context.rs index 6273df6cddebb..11543b5ed00c7 100644 --- a/src/expr/core/src/captured_context.rs +++ b/src/expr/core/src/captured_context.rs @@ -12,18 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_expr::define_context; -use risingwave_pb::batch_plan::CapturedContext; +use risingwave_expr::{define_context, Result as ExprResult}; +use risingwave_pb::stream_plan::CapturedContext; // For all execution mode. define_context! { pub TIME_ZONE: String, } -pub fn capture_context() -> CapturedContext { - let mut ctx = CapturedContext::default(); - let _ = TIME_ZONE::try_with(|time_zone| { - ctx.time_zone = Some(time_zone.to_string()); - }); - ctx +pub fn capture_context() -> ExprResult { + let ctx = TIME_ZONE::try_with(|time_zone| CapturedContext { + time_zone: time_zone.to_owned(), + })?; + Ok(ctx) } diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 1712c5f9d0bfb..f4f7bef9a99c8 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -664,8 +664,7 @@ pub fn captured_context_scope(input: TokenStream) -> TokenStream { body = quote! { async { use risingwave_expr::captured_context::#local_key_name; - let #field = ctx.#field.unwrap_or_default(); - #local_key_name::scope(#field.to_owned(), #body).await + #local_key_name::scope(ctx.#field.to_owned(), #body).await } }; }); diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index cd6dd9fd20abb..78048db949f22 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -39,11 +39,11 @@ use risingwave_connector::source::SplitMetaData; use risingwave_expr::captured_context_scope; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ - CapturedContext, DistributedLookupJoinNode, ExchangeNode, ExchangeSource, - MergeSortExchangeNode, PlanFragment, PlanNode as PlanNodePb, PlanNode, TaskId as TaskIdPb, - TaskOutputId, + DistributedLookupJoinNode, ExchangeNode, ExchangeSource, MergeSortExchangeNode, PlanFragment, + PlanNode as PlanNodePb, PlanNode, TaskId as TaskIdPb, TaskOutputId, }; use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode}; +use risingwave_pb::stream_plan::CapturedContext; use risingwave_pb::task_service::{CancelTaskRequest, TaskInfoResponse}; use risingwave_rpc_client::ComputeClientPoolRef; use tokio::spawn; @@ -633,8 +633,7 @@ impl StageRunner { async fn schedule_tasks_for_all(&mut self, shutdown_rx: ShutdownToken) -> SchedulerResult<()> { let captured_context = CapturedContext { - // TODO(kexiang): We go through the plan to make sure only the neccessay context is captured. - time_zone: Some(self.ctx.session().config().timezone().to_owned()), + time_zone: self.ctx.session().config().timezone().to_owned(), }; // If root, we execute it locally. if !self.is_root_stage() { diff --git a/src/rpc_client/src/compute_client.rs b/src/rpc_client/src/compute_client.rs index 14f0c6f6cc627..9c28791d82e98 100644 --- a/src/rpc_client/src/compute_client.rs +++ b/src/rpc_client/src/compute_client.rs @@ -21,7 +21,7 @@ use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE}; use risingwave_common::monitor::connection::{EndpointExt, TcpConfig}; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::tracing::TracingContext; -use risingwave_pb::batch_plan::{CapturedContext, PlanFragment, TaskId, TaskOutputId}; +use risingwave_pb::batch_plan::{PlanFragment, TaskId, TaskOutputId}; use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::compute::config_service_client::ConfigServiceClient; use risingwave_pb::compute::{ShowConfigRequest, ShowConfigResponse}; @@ -31,6 +31,7 @@ use risingwave_pb::monitor_service::{ ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse, }; +use risingwave_pb::stream_plan::CapturedContext; use risingwave_pb::task_service::exchange_service_client::ExchangeServiceClient; use risingwave_pb::task_service::task_service_client::TaskServiceClient; use risingwave_pb::task_service::{