diff --git a/Cargo.lock b/Cargo.lock index 1a3ebbde5a815..bd64f2fb19b37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7911,7 +7911,6 @@ dependencies = [ "auto_impl", "bk-tree", "bytes", - "chrono", "clap", "downcast-rs", "dyn-clone", diff --git a/e2e_test/batch/basic/local/make_timestamptz.slt.part b/e2e_test/batch/basic/make_timestamptz.slt.part similarity index 90% rename from e2e_test/batch/basic/local/make_timestamptz.slt.part rename to e2e_test/batch/basic/make_timestamptz.slt.part index 4109104982f28..99f5d1369327c 100644 --- a/e2e_test/batch/basic/local/make_timestamptz.slt.part +++ b/e2e_test/batch/basic/make_timestamptz.slt.part @@ -31,6 +31,20 @@ SELECT make_timestamptz(1973, 07, 15, 08, 15, 55.33); ---- 1973-07-15 08:15:55.330-04:00 +statement ok +create table ttz(tstz timestamptz); + +statement ok +insert into ttz values(make_timestamptz(1973, 06, 25, 08, 15, 55.33)); + +query TT +select * from ttz; +---- +1973-06-25 08:15:55.330-04:00 + +statement ok +drop table ttz; + query error Invalid parameter time_zone: 'Nehwon/Lankhmar' is not a valid timezone SELECT make_timestamptz(1910, 12, 24, 0, 0, 0, 'Nehwon/Lankhmar'); diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 4f1eccfab8666..5117446c1783d 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -157,3 +157,8 @@ message Cardinality { // Unbounded if not set. optional uint64 hi = 2; } + +// Provide statement-local context, e.g. session info like time zone, for execution. +message CapturedExecutionContext { + string time_zone = 1; +} diff --git a/proto/task_service.proto b/proto/task_service.proto index d29abd14b137e..ec72be346750f 100644 --- a/proto/task_service.proto +++ b/proto/task_service.proto @@ -5,6 +5,7 @@ package task_service; import "batch_plan.proto"; import "common.proto"; import "data.proto"; +import "plan_common.proto"; import "stream_plan.proto"; option java_package = "com.risingwave.proto"; @@ -40,6 +41,7 @@ message CreateTaskRequest { batch_plan.PlanFragment plan = 2; common.BatchQueryEpoch epoch = 3; map tracing_context = 4; + plan_common.CapturedExecutionContext captured_execution_context = 5; } message CancelTaskRequest { @@ -63,6 +65,7 @@ message ExecuteRequest { batch_plan.PlanFragment plan = 2; common.BatchQueryEpoch epoch = 3; map tracing_context = 4; + plan_common.CapturedExecutionContext captured_execution_context = 5; } service TaskService { diff --git a/src/batch/src/execution/grpc_exchange.rs b/src/batch/src/execution/grpc_exchange.rs index 71d1f758febff..bc5e109f8f67f 100644 --- a/src/batch/src/execution/grpc_exchange.rs +++ b/src/batch/src/execution/grpc_exchange.rs @@ -16,6 +16,7 @@ use std::fmt::{Debug, Formatter}; use futures::StreamExt; use risingwave_common::array::DataChunk; +use risingwave_expr::captured_execution_context::capture_execution_context; use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::{self, Plan}; use risingwave_pb::batch_plan::TaskOutputId; use risingwave_pb::task_service::{ExecuteRequest, GetDataResponse}; @@ -50,6 +51,7 @@ impl GrpcExchangeSource { plan: plan.plan, epoch: plan.epoch, tracing_context: plan.tracing_context, + captured_execution_context: Some(capture_execution_context()?), }; client.execute(execute_request).await? } diff --git a/src/batch/src/rpc/service/task_service.rs b/src/batch/src/rpc/service/task_service.rs index fb60e352ec293..4395f948497ce 100644 --- a/src/batch/src/rpc/service/task_service.rs +++ b/src/batch/src/rpc/service/task_service.rs @@ -63,6 +63,7 @@ impl TaskService for BatchServiceImpl { plan, epoch, tracing_context, + captured_execution_context, } = request.into_inner(); let (state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE); @@ -79,6 +80,7 @@ impl TaskService for BatchServiceImpl { ), state_reporter, TracingContext::from_protobuf(&tracing_context), + captured_execution_context.expect("no captured execution context found"), ) .await; match res { @@ -119,12 +121,15 @@ impl TaskService for BatchServiceImpl { plan, epoch, tracing_context, + captured_execution_context, } = req.into_inner(); let task_id = task_id.expect("no task id found"); let plan = plan.expect("no plan found").clone(); let epoch = epoch.expect("no epoch found"); let tracing_context = TracingContext::from_protobuf(&tracing_context); + let captured_execution_context = + captured_execution_context.expect("no captured execution context found"); let context = ComputeNodeContext::new_for_local(self.env.clone()); trace!( @@ -135,7 +140,11 @@ impl TaskService for BatchServiceImpl { let task = BatchTaskExecution::new(&task_id, plan, context, epoch, self.mgr.runtime())?; let task = Arc::new(task); let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE); - if let Err(e) = task.clone().async_execute(None, tracing_context).await { + if let Err(e) = task + .clone() + .async_execute(None, tracing_context, captured_execution_context) + .await + { error!( "failed to build executors and trigger execution of Task {:?}: {}", task_id, e diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 4fa26b4d5d69e..c9dc9f888a7e4 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -27,8 +27,10 @@ use risingwave_common::array::DataChunk; use risingwave_common::util::panic::FutureCatchUnwindExt; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_common::util::tracing::TracingContext; +use risingwave_expr::captured_execution_context_scope; use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment}; use risingwave_pb::common::BatchQueryEpoch; +use risingwave_pb::plan_common::CapturedExecutionContext; use risingwave_pb::task_service::task_info_response::TaskStatus; use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse}; use risingwave_pb::PbFieldNotFound; @@ -425,6 +427,7 @@ impl BatchTaskExecution { self: Arc, state_tx: Option, tracing_context: TracingContext, + captured_execution_context: CapturedExecutionContext, ) -> Result<()> { let mut state_tx = state_tx; trace!( @@ -433,14 +436,17 @@ impl BatchTaskExecution { serde_json::to_string_pretty(self.plan.get_root()?).unwrap() ); - let exec = ExecutorBuilder::new( - self.plan.root.as_ref().unwrap(), - &self.task_id, - self.context.clone(), - self.epoch.clone(), - self.shutdown_rx.clone(), + let exec = captured_execution_context_scope!( + captured_execution_context.clone(), + ExecutorBuilder::new( + self.plan.root.as_ref().unwrap(), + &self.task_id, + self.context.clone(), + self.epoch.clone(), + self.shutdown_rx.clone(), + ) + .build() ) - .build() .await?; let sender = self.sender.clone(); @@ -472,9 +478,11 @@ impl BatchTaskExecution { // We should only pass a reference of sender to execution because we should only // close it after task error has been set. - t_1.run(exec, sender, state_tx.as_mut()) - .instrument(span) - .await; + captured_execution_context_scope!( + captured_execution_context, + t_1.run(exec, sender, state_tx.as_mut()).instrument(span) + ) + .await; }; if let Some(batch_metrics) = batch_metrics { diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index 8a297cd659640..e09809ccbee07 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -25,6 +25,7 @@ use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_common::util::tracing::TracingContext; use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment}; use risingwave_pb::common::BatchQueryEpoch; +use risingwave_pb::plan_common::CapturedExecutionContext; use risingwave_pb::task_service::task_info_response::TaskStatus; use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse}; use tokio::sync::mpsc::Sender; @@ -102,6 +103,7 @@ impl BatchManager { context: ComputeNodeContext, state_reporter: StateReporter, tracing_context: TracingContext, + captured_execution_context: CapturedExecutionContext, ) -> Result<()> { trace!("Received task id: {:?}, plan: {:?}", tid, plan); let task = BatchTaskExecution::new(tid, plan, context, epoch, self.runtime())?; @@ -128,11 +130,15 @@ impl BatchManager { task_id, ); }; - task.async_execute(Some(state_reporter), tracing_context) - .await - .inspect_err(|_| { - self.cancel_task(&task_id.to_prost()); - })?; + task.async_execute( + Some(state_reporter), + tracing_context, + captured_execution_context, + ) + .await + .inspect_err(|_| { + self.cancel_task(&task_id.to_prost()); + })?; ret } @@ -151,6 +157,9 @@ impl BatchManager { ComputeNodeContext::for_test(), StateReporter::new_with_test(), TracingContext::none(), + CapturedExecutionContext { + time_zone: "UTC".to_string(), + }, ) .await } diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index db3795b8a20ed..09c7b619b9fc7 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -45,6 +45,10 @@ risingwave_udf = { workspace = true } smallvec = "1" static_assertions = "1" thiserror = "1" +tokio = { version = "0.2", package = "madsim-tokio", features = [ + "rt", + "macros", +] } tracing = "0.1" [target.'cfg(not(madsim))'.dependencies] diff --git a/src/expr/core/src/captured_execution_context.rs b/src/expr/core/src/captured_execution_context.rs new file mode 100644 index 0000000000000..800fc16be9cd2 --- /dev/null +++ b/src/expr/core/src/captured_execution_context.rs @@ -0,0 +1,26 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_expr::{define_context, Result as ExprResult}; +use risingwave_pb::plan_common::CapturedExecutionContext; + +// For all execution mode. +define_context! { + pub TIME_ZONE: String, +} + +pub fn capture_execution_context() -> ExprResult { + let time_zone = TIME_ZONE::try_with(ToOwned::to_owned)?; + Ok(CapturedExecutionContext { time_zone }) +} diff --git a/src/expr/core/src/lib.rs b/src/expr/core/src/lib.rs index a92e535946fa5..ff632e8cf46ce 100644 --- a/src/expr/core/src/lib.rs +++ b/src/expr/core/src/lib.rs @@ -25,6 +25,7 @@ extern crate self as risingwave_expr; pub mod aggregate; +pub mod captured_execution_context; #[doc(hidden)] pub mod codegen; mod error; diff --git a/src/expr/impl/src/lib.rs b/src/expr/impl/src/lib.rs index 51b9a20a75c46..f6a8a5667e46e 100644 --- a/src/expr/impl/src/lib.rs +++ b/src/expr/impl/src/lib.rs @@ -32,6 +32,7 @@ #![feature(test)] #![feature(arc_unwrap_or_clone)] #![feature(iter_array_chunks)] +#![feature(result_flattening)] mod aggregate; mod scalar; diff --git a/src/frontend/src/expr/function_impl/make_timestamptz.rs b/src/expr/impl/src/scalar/make_timestamptz.rs similarity index 75% rename from src/frontend/src/expr/function_impl/make_timestamptz.rs rename to src/expr/impl/src/scalar/make_timestamptz.rs index 8d7ea62a696b3..90029b0e2dcbe 100644 --- a/src/frontend/src/expr/function_impl/make_timestamptz.rs +++ b/src/expr/impl/src/scalar/make_timestamptz.rs @@ -13,19 +13,11 @@ // limitations under the License. use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; -use risingwave_common::types::{FloatExt, Timestamptz, F64}; +use risingwave_common::types::{FloatExt, Timestamp, Timestamptz, F64}; +use risingwave_expr::captured_execution_context::TIME_ZONE; use risingwave_expr::{capture_context, function, ExprError, Result}; -use super::context::TIME_ZONE; - -/// Just a wrapper to reuse the `map_err` logic. -#[inline(always)] -pub fn time_zone_err(inner_err: String) -> ExprError { - ExprError::InvalidParam { - name: "time_zone", - reason: inner_err.into(), - } -} +use crate::scalar::timestamptz::timestamp_at_time_zone; // year int, month int, day int, hour int, min int, sec double precision #[function("make_timestamptz(int4, int4, int4, int4, int4, float8) -> timestamptz")] @@ -64,7 +56,6 @@ fn make_timestamptz_impl( min: i32, sec: F64, ) -> Result { - let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?; if !sec.is_finite() || sec.0.is_sign_negative() { return Err(ExprError::InvalidParam { name: "sec", @@ -86,16 +77,6 @@ fn make_timestamptz_impl( reason: "invalid time".into(), })?, ); - let date_time = naive_date_time - .and_local_timezone(time_zone) - .latest() - .ok_or_else(|| ExprError::InvalidParam { - name: "time_zone", - reason: format!( - "fail to interpret local timestamp \"{:?}\" in time zone \"{}\"", - naive_date_time, time_zone - ) - .into(), - })?; - Ok(Timestamptz::from(date_time)) + + timestamp_at_time_zone(Timestamp(naive_date_time), time_zone) } diff --git a/src/expr/impl/src/scalar/mod.rs b/src/expr/impl/src/scalar/mod.rs index ad859c20c6ce2..e99a71323b617 100644 --- a/src/expr/impl/src/scalar/mod.rs +++ b/src/expr/impl/src/scalar/mod.rs @@ -52,6 +52,7 @@ mod jsonb_info; mod jsonb_object; mod length; mod lower; +mod make_timestamptz; mod md5; mod overlay; mod position; diff --git a/src/expr/macro/src/context.rs b/src/expr/macro/src/context.rs index 435651de9b300..683f873042e79 100644 --- a/src/expr/macro/src/context.rs +++ b/src/expr/macro/src/context.rs @@ -16,7 +16,7 @@ use itertools::Itertools; use proc_macro2::TokenStream; use quote::{quote, quote_spanned, ToTokens}; use syn::parse::{Parse, ParseStream}; -use syn::{Error, FnArg, Ident, ItemFn, Result, Token, Type, Visibility}; +use syn::{Error, Expr, FnArg, Ident, ItemFn, Result, Token, Type, Visibility}; use crate::utils::extend_vis_with_super; @@ -224,3 +224,17 @@ pub(super) fn generate_captured_function( #new_user_fn }) } + +pub(super) struct CapturedExecutionContextScopeInput { + pub context: Expr, + pub closure: Expr, +} + +impl Parse for CapturedExecutionContextScopeInput { + fn parse(input: ParseStream<'_>) -> Result { + let context: Expr = input.parse()?; + input.parse::()?; + let closure: Expr = input.parse()?; + Ok(Self { context, closure }) + } +} diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 544b369072d79..0da623001a2ed 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -15,13 +15,17 @@ #![feature(lint_reasons)] #![feature(let_chains)] -use context::DefineContextAttr; +use std::vec; + +use context::{ + generate_captured_function, CaptureContextAttr, CapturedExecutionContextScopeInput, + DefineContextAttr, +}; use proc_macro::TokenStream; use proc_macro2::TokenStream as TokenStream2; +use quote::{format_ident, quote}; use syn::{Error, ItemFn, Result}; -use crate::context::{generate_captured_function, CaptureContextAttr}; - mod context; mod gen; mod parse; @@ -645,3 +649,37 @@ pub fn capture_context(attr: TokenStream, item: TokenStream) -> TokenStream { Err(e) => e.to_compile_error().into(), } } + +/// Add scope to provide the captured context variables for the closure. +/// For now, we add this when building an Executor and when an Exetutor is run. +#[proc_macro] +pub fn captured_execution_context_scope(input: TokenStream) -> TokenStream { + fn inner(input: TokenStream) -> Result { + let CapturedExecutionContextScopeInput { context, closure } = syn::parse(input)?; + + let ctx = quote! { let ctx = #context; }; + let mut body = quote! { #closure }; + let fields = vec![format_ident!("time_zone")]; + fields.iter().for_each(|field| { + let local_key_name = format_ident!("{}", field.to_string().to_uppercase()); + body = quote! { + async { + use risingwave_expr::captured_execution_context::#local_key_name; + #local_key_name::scope(ctx.#field.to_owned(), #body).await + } + }; + }); + body = quote! { + async { + #ctx + #body.await + } + }; + Ok(body) + } + + match inner(input) { + Ok(tokens) => tokens.into(), + Err(e) => e.to_compile_error().into(), + } +} diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index c90804f169b49..ada566ad81797 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -24,7 +24,6 @@ auto_enums = { version = "0.8", features = ["futures03"] } auto_impl = "1" bk-tree = "0.5.0" bytes = "1" -chrono = { version = "0.4" } clap = { version = "4", features = ["derive"] } downcast-rs = "1.2" dyn-clone = "1.0.14" diff --git a/src/frontend/src/expr/function_impl/context.rs b/src/frontend/src/expr/function_impl/context.rs index ae81e31cc270b..21c55fb6ab7e6 100644 --- a/src/frontend/src/expr/function_impl/context.rs +++ b/src/frontend/src/expr/function_impl/context.rs @@ -19,10 +19,10 @@ use risingwave_expr::define_context; use crate::session::AuthContext; +// Only for local mode. define_context! { pub(super) CATALOG_READER: crate::catalog::CatalogReader, pub(super) AUTH_CONTEXT: Arc, pub(super) DB_NAME: String, pub(super) SEARCH_PATH: SearchPath, - pub(super) TIME_ZONE: String, } diff --git a/src/frontend/src/expr/function_impl/mod.rs b/src/frontend/src/expr/function_impl/mod.rs index ff13a93b6da7f..1f31b7f307dac 100644 --- a/src/frontend/src/expr/function_impl/mod.rs +++ b/src/frontend/src/expr/function_impl/mod.rs @@ -15,4 +15,3 @@ mod cast_regclass; mod col_description; pub mod context; -mod make_timestamptz; diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index cee3db2986cf5..f38e3d7201933 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -36,12 +36,14 @@ use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::select_all; use risingwave_connector::source::SplitMetaData; +use risingwave_expr::captured_execution_context_scope; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ DistributedLookupJoinNode, ExchangeNode, ExchangeSource, MergeSortExchangeNode, PlanFragment, PlanNode as PlanNodePb, PlanNode, TaskId as TaskIdPb, TaskOutputId, }; use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode}; +use risingwave_pb::plan_common::CapturedExecutionContext; use risingwave_pb::task_service::{CancelTaskRequest, TaskInfoResponse}; use risingwave_rpc_client::ComputeClientPoolRef; use tokio::spawn; @@ -332,7 +334,11 @@ impl StageRunner { /// Schedule all tasks to CN and wait process all status messages from RPC. Note that when all /// task is created, it should tell `QueryRunner` to schedule next. - async fn schedule_tasks(&mut self, mut shutdown_rx: ShutdownToken) -> SchedulerResult<()> { + async fn schedule_tasks( + &mut self, + mut shutdown_rx: ShutdownToken, + captured_execution_context: CapturedExecutionContext, + ) -> SchedulerResult<()> { let mut futures = vec![]; if let Some(table_scan_info) = self.stage.table_scan_info.as_ref() @@ -361,7 +367,7 @@ impl StageRunner { let vnode_ranges = vnode_bitmaps[¶llel_unit_id].clone(); let plan_fragment = self.create_plan_fragment(i as u32, Some(PartitionInfo::Table(vnode_ranges))); - futures.push(self.schedule_task(task_id, plan_fragment, Some(worker))); + futures.push(self.schedule_task(task_id, plan_fragment, Some(worker), captured_execution_context.clone())); } } else if let Some(source_info) = self.stage.source_info.as_ref() { for (id, split) in source_info.split_info().unwrap().iter().enumerate() { @@ -374,7 +380,7 @@ impl StageRunner { .create_plan_fragment(id as u32, Some(PartitionInfo::Source(split.clone()))); let worker = self.choose_worker(&plan_fragment, id as u32, self.stage.dml_table_id)?; - futures.push(self.schedule_task(task_id, plan_fragment, worker)); + futures.push(self.schedule_task(task_id, plan_fragment, worker, captured_execution_context.clone())); } } else { for id in 0..self.stage.parallelism.unwrap() { @@ -385,7 +391,7 @@ impl StageRunner { }; let plan_fragment = self.create_plan_fragment(id, None); let worker = self.choose_worker(&plan_fragment, id, self.stage.dml_table_id)?; - futures.push(self.schedule_task(task_id, plan_fragment, worker)); + futures.push(self.schedule_task(task_id, plan_fragment, worker, captured_execution_context.clone())); } } @@ -548,6 +554,7 @@ impl StageRunner { async fn schedule_tasks_for_root( &mut self, mut shutdown_rx: ShutdownToken, + captured_execution_context: CapturedExecutionContext, ) -> SchedulerResult<()> { let root_stage_id = self.stage.id; // Currently, the dml or table scan should never be root fragment, so the partition is None. @@ -574,32 +581,35 @@ impl StageRunner { ); let shutdown_rx0 = shutdown_rx.clone(); - let executor = executor.build().await?; - let chunk_stream = executor.execute(); - let cancelled = pin!(shutdown_rx.cancelled()); - #[for_await] - for chunk in chunk_stream.take_until(cancelled) { - if let Err(ref e) = chunk { - if shutdown_rx0.is_cancelled() { - break; - } - let err_str = e.to_string(); - // This is possible if The Query Runner drop early before schedule the root - // executor. Detail described in https://github.com/risingwavelabs/risingwave/issues/6883#issuecomment-1348102037. - // The error format is just channel closed so no care. - if let Err(_e) = result_tx.send(chunk.map_err(|e| e.into())).await { - warn!("Root executor has been dropped before receive any events so the send is failed"); - } - // Different from below, return this function and report error. - return Err(TaskExecutionError(err_str)); - } else { - // Same for below. - if let Err(_e) = result_tx.send(chunk.map_err(|e| e.into())).await { - warn!("Root executor has been dropped before receive any events so the send is failed"); + captured_execution_context_scope!(captured_execution_context, async { + let executor = executor.build().await?; + let chunk_stream = executor.execute(); + let cancelled = pin!(shutdown_rx.cancelled()); + #[for_await] + for chunk in chunk_stream.take_until(cancelled) { + if let Err(ref e) = chunk { + if shutdown_rx0.is_cancelled() { + break; + } + let err_str = e.to_string(); + // This is possible if The Query Runner drop early before schedule the root + // executor. Detail described in https://github.com/risingwavelabs/risingwave/issues/6883#issuecomment-1348102037. + // The error format is just channel closed so no care. + if let Err(_e) = result_tx.send(chunk.map_err(|e| e.into())).await { + warn!("Root executor has been dropped before receive any events so the send is failed"); + } + // Different from below, return this function and report error. + return Err(TaskExecutionError(err_str)); + } else { + // Same for below. + if let Err(_e) = result_tx.send(chunk.map_err(|e| e.into())).await { + warn!("Root executor has been dropped before receive any events so the send is failed"); + } } } - } + Ok(()) + }).await?; // Terminated by other tasks execution error, so no need to return error here. match shutdown_rx0.message() { @@ -622,11 +632,16 @@ impl StageRunner { } async fn schedule_tasks_for_all(&mut self, shutdown_rx: ShutdownToken) -> SchedulerResult<()> { + let captured_execution_context = CapturedExecutionContext { + time_zone: self.ctx.session().config().timezone().to_owned(), + }; // If root, we execute it locally. if !self.is_root_stage() { - self.schedule_tasks(shutdown_rx).await?; + self.schedule_tasks(shutdown_rx, captured_execution_context) + .await?; } else { - self.schedule_tasks_for_root(shutdown_rx).await?; + self.schedule_tasks_for_root(shutdown_rx, captured_execution_context) + .await?; } Ok(()) } @@ -824,6 +839,7 @@ impl StageRunner { task_id: TaskIdPb, plan_fragment: PlanFragment, worker: Option, + captured_execution_context: CapturedExecutionContext, ) -> SchedulerResult>> { let mut worker = worker.unwrap_or(self.worker_node_manager.next_random_worker()?); let worker_node_addr = worker.host.take().unwrap(); @@ -835,8 +851,14 @@ impl StageRunner { .map_err(|e| anyhow!(e))?; let t_id = task_id.task_id; - let stream_status = compute_client - .create_task(task_id, plan_fragment, self.epoch.clone()) + + let stream_status: Fuse> = compute_client + .create_task( + task_id, + plan_fragment, + self.epoch.clone(), + captured_execution_context, + ) .await .inspect_err(|_| self.mask_failed_serving_worker(&worker)) .map_err(|e| anyhow!(e))? diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 3823733b74824..3a043ab504fbc 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -164,8 +164,10 @@ impl LocalQueryExecution { } }; + use risingwave_expr::captured_execution_context::TIME_ZONE; + use crate::expr::function_impl::context::{ - AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH, TIME_ZONE, + AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH, }; let exec = async move { CATALOG_READER::scope(catalog_reader, exec).await }; diff --git a/src/rpc_client/src/compute_client.rs b/src/rpc_client/src/compute_client.rs index 15516380bd418..580e36eecc11e 100644 --- a/src/rpc_client/src/compute_client.rs +++ b/src/rpc_client/src/compute_client.rs @@ -31,6 +31,7 @@ use risingwave_pb::monitor_service::{ ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse, }; +use risingwave_pb::plan_common::CapturedExecutionContext; use risingwave_pb::task_service::exchange_service_client::ExchangeServiceClient; use risingwave_pb::task_service::task_service_client::TaskServiceClient; use risingwave_pb::task_service::{ @@ -158,6 +159,7 @@ impl ComputeClient { task_id: TaskId, plan: PlanFragment, epoch: BatchQueryEpoch, + captured_execution_context: CapturedExecutionContext, ) -> Result> { Ok(self .task_client @@ -167,6 +169,7 @@ impl ComputeClient { plan: Some(plan), epoch: Some(epoch), tracing_context: TracingContext::from_current_span().to_protobuf(), + captured_execution_context: Some(captured_execution_context), }) .await? .into_inner())