diff --git a/e2e_test/source/basic/kafka_batch.slt b/e2e_test/source/basic/kafka_batch.slt
index 8d8d454c7c977..a81acf49518b5 100644
--- a/e2e_test/source/basic/kafka_batch.slt
+++ b/e2e_test/source/basic/kafka_batch.slt
@@ -92,6 +92,25 @@ select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00+00:00'
 3 333
 4 4444
 
+query IT rowsort
+select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00'
+----
+1 1
+2 22
+3 333
+4 4444
+
+query IT rowsort
+select * from s1 where _rw_kafka_timestamp > TO_TIMESTAMP('1977-01-01 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')
+----
+1 1
+2 22
+3 333
+4 4444
+
+statement error expected format
+select * from s1 where _rw_kafka_timestamp > 'abc'
+
 query IT
 select * from s1 where _rw_kafka_timestamp > '2045-01-01 0:00:00+00:00'
 ----
@@ -201,4 +220,4 @@ statement ok
 drop table s8
 
 statement ok
-drop source s9
\ No newline at end of file
+drop source s9
diff --git a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml
index f54f6a837343f..e2f7dc9bde0dd 100644
--- a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml
@@ -84,5 +84,5 @@
   sql: |
     with t(v, arr) as (select 1, array[2, 3]) select v < all(arr), v < some(arr) from t;
   batch_plan: |-
-    BatchProject { exprs: [All((1:Int32 < $expr10060)) as $expr1, Some((1:Int32 < $expr10060)) as $expr2] }
+    BatchProject { exprs: [All((1:Int32 < $expr10064)) as $expr1, Some((1:Int32 < $expr10064)) as $expr2] }
     └─BatchValues { rows: [[1:Int32, ARRAY[2, 3]:List(Int32)]] }
diff --git a/src/frontend/planner_test/tests/testdata/output/explain.yaml b/src/frontend/planner_test/tests/testdata/output/explain.yaml
index 157736e53b21d..1662624eea519 100644
--- a/src/frontend/planner_test/tests/testdata/output/explain.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/explain.yaml
@@ -70,7 +70,7 @@
       "stages": {
         "0": {
           "root": {
-            "plan_node_id": 10036,
+            "plan_node_id": 10038,
             "plan_node_type": "BatchValues",
             "schema": [
               {
diff --git a/src/frontend/planner_test/tests/testdata/output/expr.yaml b/src/frontend/planner_test/tests/testdata/output/expr.yaml
index 9492491c08aaa..3effdc191eab0 100644
--- a/src/frontend/planner_test/tests/testdata/output/expr.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/expr.yaml
@@ -450,7 +450,7 @@
       └─LogicalProject { exprs: [Array(1:Int32) as $expr1] }
         └─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
   batch_plan: |-
-    BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10039, ARRAY[2]:List(Int32)))) as $expr1] }
+    BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10042, ARRAY[2]:List(Int32)))) as $expr1] }
     └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
       ├─BatchValues { rows: [[]] }
       └─BatchValues { rows: [[ARRAY[1]:List(Int32)]] }
@@ -473,7 +473,7 @@
       └─LogicalProject { exprs: [Array(1:Int32) as $expr1] }
         └─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
   batch_plan: |-
-    BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10039, ARRAY[2]:List(Int32)))) as $expr1] }
+    BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10042, ARRAY[2]:List(Int32)))) as $expr1] }
     └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
       ├─BatchValues { rows: [[]] }
       └─BatchValues { rows: [[ARRAY[1]:List(Int32)]] }
@@ -497,7 +497,7 @@
     select * from t where v1 >= now() - INTERVAL '2' SECOND;
   logical_plan: |-
     LogicalProject { exprs: [t.v1] }
-    └─LogicalFilter { predicate: (t.v1 >= (Now - '00:00:02':Interval)) }
+    └─LogicalFilter { predicate: (t.v1 >= SubtractWithTimeZone(Now, '00:00:02':Interval, 'UTC':Varchar)) }
       └─LogicalScan { table: t, columns: [t.v1, t._row_id] }
   stream_plan: |-
     StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
diff --git a/src/frontend/planner_test/tests/testdata/output/predicate_pushdown.yaml b/src/frontend/planner_test/tests/testdata/output/predicate_pushdown.yaml
index ae37459ef7bed..9424cce324607 100644
--- a/src/frontend/planner_test/tests/testdata/output/predicate_pushdown.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/predicate_pushdown.yaml
@@ -256,8 +256,8 @@
     select * from t1 cross join t2 where v1 = v2 and v1 > now() + '1 hr';
   optimized_logical_plan_for_batch: |-
     LogicalJoin { type: Inner, on: (t1.v1 = t2.v2), output: all }
-    ├─LogicalScan { table: t1, columns: [t1.v1], predicate: (t1.v1 > ('2021-04-01 00:00:00+00:00':Timestamptz + '01:00:00':Interval)) }
-    └─LogicalScan { table: t2, columns: [t2.v2], predicate: (t2.v2 > ('2021-04-01 00:00:00+00:00':Timestamptz + '01:00:00':Interval)) }
+    ├─LogicalScan { table: t1, columns: [t1.v1], predicate: (t1.v1 > AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, '01:00:00':Interval, 'UTC':Varchar)) }
+    └─LogicalScan { table: t2, columns: [t2.v2], predicate: (t2.v2 > AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, '01:00:00':Interval, 'UTC':Varchar)) }
   stream_plan: |-
     StreamMaterialize { columns: [v1, v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, v1], pk_columns: [t1._row_id, t2._row_id, v1], pk_conflict: NoCheck }
     └─StreamHashJoin { type: Inner, predicate: t1.v1 = t2.v2, output: [t1.v1, t2.v2, t1._row_id, t2._row_id] }
@@ -277,7 +277,7 @@
   optimized_logical_plan_for_batch: |-
     LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) AND (t1.v1 > $expr1), output: [t1.v1, t2.v2, t2.v3] }
     ├─LogicalScan { table: t1, columns: [t1.v1] }
-    └─LogicalProject { exprs: [t2.v2, t2.v3, ('2021-04-01 00:00:00+00:00':Timestamptz + t2.v3) as $expr1] }
+    └─LogicalProject { exprs: [t2.v2, t2.v3, AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, t2.v3, 'UTC':Varchar) as $expr1] }
       └─LogicalScan { table: t2, columns: [t2.v2, t2.v3] }
   stream_error: 'internal error: Conditions containing now must be of the form `input_expr cmp now() [+- const_expr]` or `now() [+- const_expr] cmp input_expr`, where `input_expr` references a column and contains no `now()`.'
 - name: now() in complex cmp expr pushed onto join ON clause results in dynamic filter
@@ -288,14 +288,14 @@
   optimized_logical_plan_for_batch: |-
     LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) AND (t1.v1 > $expr1), output: [t1.v1, t2.v2, t2.v3] }
     ├─LogicalScan { table: t1, columns: [t1.v1] }
-    └─LogicalProject { exprs: [t2.v2, t2.v3, ('2021-04-01 00:00:00+00:00':Timestamptz + t2.v3) as $expr1] }
+    └─LogicalProject { exprs: [t2.v2, t2.v3, AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, t2.v3, 'UTC':Varchar) as $expr1] }
       └─LogicalScan { table: t2, columns: [t2.v2, t2.v3] }
   stream_error: 'internal error: Conditions containing now must be of the form `input_expr cmp now() [+- const_expr]` or `now() [+- const_expr] cmp input_expr`, where `input_expr` references a column and contains no `now()`.'
 - name: now() does not get pushed to scan, but others do
   sql: |
     create table t1(v1 timestamp with time zone, v2 int);
     select * from t1 where v1 > now() + '30 min' and v2 > 5;
-  optimized_logical_plan_for_batch: 'LogicalScan { table: t1, columns: [t1.v1, t1.v2], predicate: (t1.v1 > (''2021-04-01 00:00:00+00:00'':Timestamptz + ''00:30:00'':Interval)) AND (t1.v2 > 5:Int32) }'
+  optimized_logical_plan_for_batch: 'LogicalScan { table: t1, columns: [t1.v1, t1.v2], predicate: (t1.v1 > AddWithTimeZone(''2021-04-01 00:00:00+00:00'':Timestamptz, ''00:30:00'':Interval, ''UTC'':Varchar)) AND (t1.v2 > 5:Int32) }'
   stream_plan: |-
     StreamMaterialize { columns: [v1, v2, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
     └─StreamDynamicFilter { predicate: (t1.v1 > $expr1), output_watermarks: [t1.v1], output: [t1.v1, t1.v2, t1._row_id], cleaned_by_watermark: true }
diff --git a/src/frontend/planner_test/tests/testdata/output/types.yaml b/src/frontend/planner_test/tests/testdata/output/types.yaml
index 4f4cff47b8c11..dbc7d1e20a191 100644
--- a/src/frontend/planner_test/tests/testdata/output/types.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/types.yaml
@@ -15,7 +15,7 @@
 - sql: values ('1'::float(53));
   logical_plan: 'LogicalValues { rows: [[1:Float64]], schema: Schema { fields: [*VALUES*_0.column_0:Float64] } }'
 - sql: values (''::timestamp with time zone);
-  logical_plan: 'LogicalValues { rows: [['''':Varchar::Timestamptz]], schema: Schema { fields: [*VALUES*_0.column_0:Timestamptz] } }'
+  logical_plan: 'LogicalValues { rows: [[CastWithTimeZone('''':Varchar, ''UTC'':Varchar)]], schema: Schema { fields: [*VALUES*_0.column_0:Timestamptz] } }'
 - sql: values (''::time with time zone);
   binder_error: |-
     Bind error: failed to bind expression: CAST('' AS TIME WITH TIME ZONE)
diff --git a/src/frontend/planner_test/tests/testdata/output/watermark.yaml b/src/frontend/planner_test/tests/testdata/output/watermark.yaml
index d1916a33192c6..b5abee5d22608 100644
--- a/src/frontend/planner_test/tests/testdata/output/watermark.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/watermark.yaml
@@ -4,7 +4,7 @@
     create source t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) with (connector = 'kinesis') FORMAT PLAIN ENCODE JSON;
     select t.v1 - INTERVAL '2' SECOND as v1 from t;
   logical_plan: |-
-    LogicalProject { exprs: [(v1 - '00:00:02':Interval) as $expr1] }
+    LogicalProject { exprs: [SubtractWithTimeZone(v1, '00:00:02':Interval, 'UTC':Varchar) as $expr1] }
     └─LogicalSource { source: t, columns: [v1, _row_id], time_range: (Unbounded, Unbounded) }
   stream_plan: |-
     StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
@@ -103,7 +103,7 @@
     select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2, t2.ts as t2_ts, t2.v1 as t2_v1, t2.v2 as t2_v2 from t1 left outer join t2 on (t1.v1 = t2.v1 and (t1.ts >= t2.ts + INTERVAL '1' SECOND) and (t2.ts >= t1.ts + INTERVAL '1' SECOND));
   logical_plan: |-
     LogicalProject { exprs: [t1.ts, t1.v1, t1.v2, t2.ts, t2.v1, t2.v2] }
-    └─LogicalJoin { type: LeftOuter, on: (t1.v1 = t2.v1) AND (t1.ts >= (t2.ts + '00:00:01':Interval)) AND (t2.ts >= (t1.ts + '00:00:01':Interval)), output: all }
+    └─LogicalJoin { type: LeftOuter, on: (t1.v1 = t2.v1) AND (t1.ts >= AddWithTimeZone(t2.ts, '00:00:01':Interval, 'UTC':Varchar)) AND (t2.ts >= AddWithTimeZone(t1.ts, '00:00:01':Interval, 'UTC':Varchar)), output: all }
       ├─LogicalScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id] }
       └─LogicalScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id] }
   stream_plan: |-
@@ -122,7 +122,7 @@
     select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2, t2.ts as t2_ts, t2.v1 as t2_v1, t2.v2 as t2_v2 from t1 join t2 on (t1.v1 = t2.v1 and (t1.ts >= t2.ts + INTERVAL '1' SECOND) and (t2.ts >= t1.ts + INTERVAL '1' SECOND));
   logical_plan: |-
     LogicalProject { exprs: [t1.ts, t1.v1, t1.v2, t2.ts, t2.v1, t2.v2] }
-    └─LogicalJoin { type: Inner, on: (t1.v1 = t2.v1) AND (t1.ts >= (t2.ts + '00:00:01':Interval)) AND (t2.ts >= (t1.ts + '00:00:01':Interval)), output: all }
+    └─LogicalJoin { type: Inner, on: (t1.v1 = t2.v1) AND (t1.ts >= AddWithTimeZone(t2.ts, '00:00:01':Interval, 'UTC':Varchar)) AND (t2.ts >= AddWithTimeZone(t1.ts, '00:00:01':Interval, 'UTC':Varchar)), output: all }
       ├─LogicalScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id] }
       └─LogicalScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id] }
   stream_plan: |-
diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs
index 0af30ccb364f8..756ef33c3ed98 100644
--- a/src/frontend/src/expr/mod.rs
+++ b/src/frontend/src/expr/mod.rs
@@ -789,11 +789,16 @@ impl ExprImpl {
         if let ExprImpl::Now(_) = self {
             true
         } else if let ExprImpl::FunctionCall(f) = self {
+            // TODO: `now() + interval '1' month` shall not be accepted as const offset
             match f.func_type() {
                 ExprType::Add | ExprType::Subtract => {
                     let (_, lhs, rhs) = f.clone().decompose_as_binary();
                     lhs.is_now_offset() && rhs.is_const()
                 }
+                ExprType::AddWithTimeZone | ExprType::SubtractWithTimeZone => {
+                    let args = f.inputs();
+                    args[0].is_now_offset() && args[1].is_const()
+                }
                 _ => false,
             }
         } else {
diff --git a/src/frontend/src/expr/session_timezone.rs b/src/frontend/src/expr/session_timezone.rs
index 545eb1427f99b..0630dc38504b0 100644
--- a/src/frontend/src/expr/session_timezone.rs
+++ b/src/frontend/src/expr/session_timezone.rs
@@ -21,6 +21,8 @@ use crate::expr::{Expr, ExprImpl};
 
 /// `SessionTimezone` will be used to resolve session
 /// timezone-dependent casts, comparisons or arithmetic.
+///
+/// This rewrite is idempotent as required by [`crate::optimizer::plan_node::ExprRewritable`].
 pub struct SessionTimezone {
     timezone: String,
     /// Whether or not the session timezone was used
@@ -242,6 +244,8 @@ impl SessionTimezone {
                 new_inputs.push(ExprImpl::literal_varchar(self.timezone()));
                 Some(FunctionCall::new(func_type, new_inputs).unwrap().into())
             }
+            // When adding new rules, make sure to match arity and types of all args
+            // so that the rewrite is idempotent.
             _ => None,
         }
     }
diff --git a/src/frontend/src/expr/utils.rs b/src/frontend/src/expr/utils.rs
index d07287b08dbe2..88c29c143a6f0 100644
--- a/src/frontend/src/expr/utils.rs
+++ b/src/frontend/src/expr/utils.rs
@@ -535,10 +535,10 @@ impl WatermarkAnalyzer {
                     ExprImpl::Literal(lit) => lit.get_data().as_ref().map(|s| s.as_utf8()),
                     _ => return WatermarkDerivation::None,
                 };
-                let interval = match &func_call.inputs()[1] {
-                    ExprImpl::Literal(lit) => lit.get_data().as_ref().map(|s| s.as_interval()),
-                    _ => return WatermarkDerivation::None,
+                let Some(Ok(interval)) = &func_call.inputs()[1].try_fold_const() else {
+                    return WatermarkDerivation::None;
                 };
+                let interval = interval.as_ref().map(|s| s.as_interval());
                 // null zone or null interval is treated same as const `interval '1' second`, to be
                 // consistent with other match arms.
                 let zone_without_dst = time_zone.map_or(true, |s| s.eq_ignore_ascii_case("UTC"));
diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs
index aaab8ebde3dcd..fcb3957120145 100644
--- a/src/frontend/src/optimizer/mod.rs
+++ b/src/frontend/src/optimizer/mod.rs
@@ -97,6 +97,10 @@ impl PlanRoot {
         assert_eq!(input_schema.fields().len(), out_fields.len());
         assert_eq!(out_fields.count_ones(..), out_names.len());
 
+        // Inline session timezone at the very beginning so that (const) exprs can be evaluated.
+        let ctx = plan.ctx();
+        let plan = inline_session_timezone_in_exprs(ctx.clone(), plan);
+
         Self {
             plan,
             required_dist,
@@ -177,8 +181,6 @@ impl PlanRoot {
         }
 
         let ctx = plan.ctx();
-        // Inline session timezone mainly for rewriting now()
-        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
 
         // Convert to physical plan node
         plan = plan.to_batch_with_order_required(&self.required_order)?;
@@ -193,8 +195,8 @@ impl PlanRoot {
             ApplyOrder::BottomUp,
         ));
 
-        // Inline session timezone
-        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
+        // Inline session timezone near the end so that exprs inserted by optimizer are handled.
+        plan = inline_session_timezone_in_exprs(ctx.clone(), plan);
 
         if ctx.is_explain_trace() {
             ctx.trace("Inline Session Timezone:");
@@ -314,8 +316,8 @@ impl PlanRoot {
             ));
         }
 
-        // Inline session timezone
-        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
+        // Inline session timezone near the end so that exprs inserted by optimizer are handled.
+        plan = inline_session_timezone_in_exprs(ctx.clone(), plan);
 
         if ctx.is_explain_trace() {
             ctx.trace("Inline session timezone:");
@@ -488,7 +490,8 @@ impl PlanRoot {
             RequiredDist::ShardByKey(bitset)
         };
 
-        let stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
+        // Inline session timezone near the end so that exprs inserted by optimizer are handled.
+        let stream_plan = inline_session_timezone_in_exprs(context, stream_plan);
 
         StreamMaterialize::create_for_table(
             stream_plan,
@@ -593,9 +596,8 @@ fn const_eval_exprs(plan: PlanRef) -> Result<PlanRef> {
     Ok(plan)
 }
 
-fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result<PlanRef> {
-    let plan = plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut());
-    Ok(plan)
+fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> PlanRef {
+    plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut())
 }
 
 fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool {
diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs
index 4ac006887ef55..d2dbcf4787eb0 100644
--- a/src/frontend/src/optimizer/plan_node/logical_source.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_source.rs
@@ -369,8 +369,7 @@ fn expr_to_kafka_timestamp_range(
 
     match &expr {
         ExprImpl::FunctionCall(function_call) => {
-            if let Some((timestampz_literal, reverse)) = extract_timestampz_literal(&expr).unwrap()
-            {
+            if let Ok(Some((timestampz_literal, reverse))) = extract_timestampz_literal(&expr) {
                 match function_call.func_type() {
                     ExprType::GreaterThan => {
                         if reverse {