Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(expr): support streaming make_timestamptz #13702

Merged
merged 3 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions e2e_test/streaming/expr_context.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# This test is to verify the expr context propagation, using make_timestamptz as an example
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
set TimeZone to 'America/New_York';

query T
SELECT make_timestamptz(1973, 07, 15, 08, 15, 55.33);
----
1973-07-15 08:15:55.330-04:00

statement ok
CREATE TABLE tint(num int);

statement ok
CREATE MATERIALIZED VIEW mv1 as SELECT make_timestamptz(num, num, num, num, num, num, 'Asia/Manila') from tint;

statement ok
CREATE MATERIALIZED VIEW mv2 as SELECT make_timestamptz(num, num, num, num, num, num, 'America/New_York') from tint;

statement ok
CREATE MATERIALIZED VIEW mv3 as SELECT make_timestamptz(num, num, num, num, num, num) from tint;

statement ok
insert into tint values(1);

query TT
select * from mv1;
----
0001-01-01 12:00:59-04:56

query TT
select * from mv2;
----
0001-01-01 01:01:01-04:56

query TT
select * from mv3;
----
0001-01-01 01:01:01-04:56

statement ok
DROP MATERIALIZED VIEW mv1;

statement ok
DROP MATERIALIZED VIEW mv2;

statement ok
DROP MATERIALIZED VIEW mv3;

statement ok
DROP TABLE tint;

statement ok
set timezone to 'UTC';
2 changes: 1 addition & 1 deletion proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,6 @@ message Cardinality {
}

// Provide statement-local context, e.g. session info like time zone, for execution.
message CapturedExecutionContext {
message ExprContext {
string time_zone = 1;
}
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,8 @@ message StreamActor {
common.Buffer vnode_bitmap = 8;
// The SQL definition of this materialized view. Used for debugging only.
string mview_definition = 9;
// Provide the necessary context, e.g. session info like time zone, for the actor.
plan_common.ExprContext expr_context = 10;
}

enum FragmentTypeFlag {
Expand Down
4 changes: 2 additions & 2 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
batch_plan.PlanFragment plan = 2;
common.BatchQueryEpoch epoch = 3;
map<string, string> tracing_context = 4;
plan_common.CapturedExecutionContext captured_execution_context = 5;
plan_common.ExprContext expr_context = 5;

Check failure on line 44 in proto/task_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "5" on message "CreateTaskRequest" changed type from "plan_common.CapturedExecutionContext" to "plan_common.ExprContext".
}

message CancelTaskRequest {
Expand All @@ -65,7 +65,7 @@
batch_plan.PlanFragment plan = 2;
common.BatchQueryEpoch epoch = 3;
map<string, string> tracing_context = 4;
plan_common.CapturedExecutionContext captured_execution_context = 5;
plan_common.ExprContext expr_context = 5;

Check failure on line 68 in proto/task_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "5" on message "ExecuteRequest" changed type from "plan_common.CapturedExecutionContext" to "plan_common.ExprContext".
}

service TaskService {
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/execution/grpc_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::fmt::{Debug, Formatter};

use futures::StreamExt;
use risingwave_common::array::DataChunk;
use risingwave_expr::captured_execution_context::capture_execution_context;
use risingwave_expr::expr_context::capture_expr_context;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::{self, Plan};
use risingwave_pb::batch_plan::TaskOutputId;
use risingwave_pb::task_service::{ExecuteRequest, GetDataResponse};
Expand Down Expand Up @@ -51,7 +51,7 @@ impl GrpcExchangeSource {
plan: plan.plan,
epoch: plan.epoch,
tracing_context: plan.tracing_context,
captured_execution_context: Some(capture_execution_context()?),
expr_context: Some(capture_expr_context()?),
};
client.execute(execute_request).await?
}
Expand Down
11 changes: 5 additions & 6 deletions src/batch/src/rpc/service/task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl TaskService for BatchServiceImpl {
plan,
epoch,
tracing_context,
captured_execution_context,
expr_context,
} = request.into_inner();

let (state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE);
Expand All @@ -81,7 +81,7 @@ impl TaskService for BatchServiceImpl {
),
state_reporter,
TracingContext::from_protobuf(&tracing_context),
captured_execution_context.expect("no captured execution context found"),
expr_context.expect("no expression context found"),
)
.await;
match res {
Expand Down Expand Up @@ -135,15 +135,14 @@ impl BatchServiceImpl {
plan,
epoch,
tracing_context,
captured_execution_context,
expr_context,
} = req;

let task_id = task_id.expect("no task id found");
let plan = plan.expect("no plan found").clone();
let epoch = epoch.expect("no epoch found");
let tracing_context = TracingContext::from_protobuf(&tracing_context);
let captured_execution_context =
captured_execution_context.expect("no captured execution context found");
let expr_context = expr_context.expect("no expression context found");

let context = ComputeNodeContext::new_for_local(env.clone());
trace!(
Expand All @@ -156,7 +155,7 @@ impl BatchServiceImpl {
let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE);
if let Err(e) = task
.clone()
.async_execute(None, tracing_context, captured_execution_context)
.async_execute(None, tracing_context, expr_context)
.await
{
error!(
Expand Down
18 changes: 9 additions & 9 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use risingwave_common::array::DataChunk;
use risingwave_common::util::panic::FutureCatchUnwindExt;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::tracing::TracingContext;
use risingwave_expr::captured_execution_context_scope;
use risingwave_expr::expr_context::expr_context_scope;
use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::CapturedExecutionContext;
use risingwave_pb::plan_common::ExprContext;
use risingwave_pb::task_service::task_info_response::TaskStatus;
use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse};
use risingwave_pb::PbFieldNotFound;
Expand Down Expand Up @@ -428,7 +428,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
self: Arc<Self>,
state_tx: Option<StateReporter>,
tracing_context: TracingContext,
captured_execution_context: CapturedExecutionContext,
expr_context: ExprContext,
) -> Result<()> {
let mut state_tx = state_tx;
trace!(
Expand All @@ -437,16 +437,16 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
serde_json::to_string_pretty(self.plan.get_root()?).unwrap()
);

let exec = captured_execution_context_scope!(
captured_execution_context.clone(),
let exec = expr_context_scope(
expr_context.clone(),
ExecutorBuilder::new(
self.plan.root.as_ref().unwrap(),
&self.task_id,
self.context.clone(),
self.epoch.clone(),
self.shutdown_rx.clone(),
)
.build()
.build(),
)
.await?;

Expand Down Expand Up @@ -479,9 +479,9 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {

// We should only pass a reference of sender to execution because we should only
// close it after task error has been set.
captured_execution_context_scope!(
captured_execution_context,
t_1.run(exec, sender, state_tx.as_mut()).instrument(span)
expr_context_scope(
expr_context,
t_1.run(exec, sender, state_tx.as_mut()).instrument(span),
)
.await;
};
Expand Down
20 changes: 8 additions & 12 deletions src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::CapturedExecutionContext;
use risingwave_pb::plan_common::ExprContext;
use risingwave_pb::task_service::task_info_response::TaskStatus;
use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse};
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -103,7 +103,7 @@ impl BatchManager {
context: ComputeNodeContext,
state_reporter: StateReporter,
tracing_context: TracingContext,
captured_execution_context: CapturedExecutionContext,
expr_context: ExprContext,
) -> Result<()> {
trace!("Received task id: {:?}, plan: {:?}", tid, plan);
let task = BatchTaskExecution::new(tid, plan, context, epoch, self.runtime())?;
Expand All @@ -130,15 +130,11 @@ impl BatchManager {
task_id,
);
};
task.async_execute(
Some(state_reporter),
tracing_context,
captured_execution_context,
)
.await
.inspect_err(|_| {
self.cancel_task(&task_id.to_prost());
})?;
task.async_execute(Some(state_reporter), tracing_context, expr_context)
.await
.inspect_err(|_| {
self.cancel_task(&task_id.to_prost());
})?;
ret
}

Expand All @@ -157,7 +153,7 @@ impl BatchManager {
ComputeNodeContext::for_test(),
StateReporter::new_with_test(),
TracingContext::none(),
CapturedExecutionContext {
ExprContext {
time_zone: "UTC".to_string(),
},
)
Expand Down
6 changes: 1 addition & 5 deletions src/expr/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ static_assertions = "1"
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
"macros",
] }
tracing = "0.1"
Expand All @@ -57,10 +57,6 @@ workspace-hack = { path = "../../workspace-hack" }

[dev-dependencies]
expect-test = "1"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt-multi-thread",
"macros",
] }

[lints]
workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;

use risingwave_expr::{define_context, Result as ExprResult};
use risingwave_pb::plan_common::CapturedExecutionContext;
use risingwave_pb::plan_common::ExprContext;

// For all execution mode.
define_context! {
pub TIME_ZONE: String,
}

pub fn capture_execution_context() -> ExprResult<CapturedExecutionContext> {
pub fn capture_expr_context() -> ExprResult<ExprContext> {
let time_zone = TIME_ZONE::try_with(ToOwned::to_owned)?;
Ok(CapturedExecutionContext { time_zone })
Ok(ExprContext { time_zone })
}

pub async fn expr_context_scope<Fut>(expr_context: ExprContext, future: Fut) -> Fut::Output
where
Fut: Future,
{
TIME_ZONE::scope(expr_context.time_zone.to_owned(), future).await
}
2 changes: 1 addition & 1 deletion src/expr/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
extern crate self as risingwave_expr;

pub mod aggregate;
pub mod captured_execution_context;
#[doc(hidden)]
pub mod codegen;
mod error;
pub mod expr;
pub mod expr_context;
pub mod scalar;
pub mod sig;
pub mod table_function;
Expand Down
2 changes: 1 addition & 1 deletion src/expr/impl/src/scalar/make_timestamptz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use risingwave_common::types::{FloatExt, Timestamp, Timestamptz, F64};
use risingwave_expr::captured_execution_context::TIME_ZONE;
use risingwave_expr::expr_context::TIME_ZONE;
use risingwave_expr::{capture_context, function, ExprError, Result};

use crate::scalar::timestamptz::timestamp_at_time_zone;
Expand Down
16 changes: 1 addition & 15 deletions src/expr/macro/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use itertools::Itertools;
use proc_macro2::TokenStream;
use quote::{quote, quote_spanned, ToTokens};
use syn::parse::{Parse, ParseStream};
use syn::{Error, Expr, FnArg, Ident, ItemFn, Result, Token, Type, Visibility};
use syn::{Error, FnArg, Ident, ItemFn, Result, Token, Type, Visibility};

use crate::utils::extend_vis_with_super;

Expand Down Expand Up @@ -224,17 +224,3 @@ pub(super) fn generate_captured_function(
#new_user_fn
})
}

pub(super) struct CapturedExecutionContextScopeInput {
pub context: Expr,
pub closure: Expr,
}

impl Parse for CapturedExecutionContextScopeInput {
fn parse(input: ParseStream<'_>) -> Result<Self> {
let context: Expr = input.parse()?;
input.parse::<Token![,]>()?;
let closure: Expr = input.parse()?;
Ok(Self { context, closure })
}
}
Loading
Loading