Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_obj_config
  • Loading branch information
Li0k committed Nov 24, 2023
2 parents 37cd49a + 3ccb249 commit 0492995
Show file tree
Hide file tree
Showing 20 changed files with 431 additions and 83 deletions.
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<jackson.version>2.13.5</jackson.version>
<spark_sql.version>3.3.1</spark_sql.version>
<hadoop.version>3.3.3</hadoop.version>
<elasticsearch.version>7.17.13</elasticsearch.version>
<elasticsearch.version>7.17.14</elasticsearch.version>
<datastax.version>4.15.0</datastax.version>
</properties>

Expand Down
4 changes: 4 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ pub struct MetaConfig {
#[serde(default)]
pub enable_scale_in_when_recovery: bool,

/// Whether to enable auto-scaling feature.
#[serde(default)]
pub enable_automatic_parallelism_control: bool,

#[serde(default = "default::meta::meta_leader_lease_secs")]
pub meta_leader_lease_secs: u64,

Expand Down
5 changes: 5 additions & 0 deletions src/common/src/types/jsonb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ impl<'a> JsonbRef<'a> {
Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b" "));
self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
}

/// Returns the capacity of the underlying buffer.
pub fn capacity(self) -> usize {
self.0.capacity()
}
}

/// A custom implementation for [`serde_json::ser::Formatter`] to match PostgreSQL, which adds extra
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ pub fn try_get_exact_serialize_datum_size(arr: &ArrayImpl) -> Option<usize> {
ArrayImpl::Float32(_) => Some(4),
ArrayImpl::Float64(_) => Some(8),
ArrayImpl::Bool(_) => Some(1),
ArrayImpl::Jsonb(_) => Some(8),
ArrayImpl::Decimal(_) => Some(estimate_serialize_decimal_size()),
ArrayImpl::Interval(_) => Some(estimate_serialize_interval_size()),
ArrayImpl::Date(_) => Some(estimate_serialize_date_size()),
Expand Down Expand Up @@ -246,7 +245,8 @@ fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize {
ScalarRefImpl::Timestamp(_) => estimate_serialize_timestamp_size(),
ScalarRefImpl::Timestamptz(_) => 8,
ScalarRefImpl::Time(_) => estimate_serialize_time_size(),
ScalarRefImpl::Jsonb(_) => 8,
// not exact as we use internal encoding size to estimate the json string size
ScalarRefImpl::Jsonb(v) => v.capacity(),
ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s),
ScalarRefImpl::List(v) => estimate_serialize_list_size(v),
}
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ min_delta_log_num_for_hummock_version_checkpoint = 10
max_heartbeat_interval_secs = 300
disable_recovery = false
enable_scale_in_when_recovery = false
enable_automatic_parallelism_control = false
meta_leader_lease_secs = 30
default_parallelism = "Full"
enable_compaction_deterministic = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 10013,
"plan_node_id": 10010,
"plan_node_type": "BatchValues",
"schema": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@
JOIN side_input FOR SYSTEM_TIME AS OF PROCTIME() S
ON mod(B.auction, 10000) = S.key
sink_plan: |-
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10022(hidden), side_input.key(hidden)] }
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] }
└─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] }
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub use now::{InlineNowProcTime, Now, NowProcTimeFinder};
pub use parameter::Parameter;
pub use pure::*;
pub use risingwave_pb::expr::expr_node::Type as ExprType;
pub use session_timezone::SessionTimezone;
pub use session_timezone::{SessionTimezone, TimestamptzExprFinder};
pub use subquery::{Subquery, SubqueryKind};
pub use table_function::{TableFunction, TableFunctionType};
pub use type_inference::{
Expand Down
34 changes: 33 additions & 1 deletion src/frontend/src/expr/session_timezone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use risingwave_pb::expr::expr_node::Type as ExprType;

pub use crate::expr::expr_rewriter::ExprRewriter;
pub use crate::expr::function_call::FunctionCall;
use crate::expr::{Expr, ExprImpl};
use crate::expr::{Expr, ExprImpl, ExprVisitor};
use crate::session::current;

/// `SessionTimezone` will be used to resolve session
Expand Down Expand Up @@ -264,3 +264,35 @@ impl SessionTimezone {
.into()
}
}

#[derive(Default)]
pub struct TimestamptzExprFinder {
has: bool,
}

impl TimestamptzExprFinder {
pub fn has(&self) -> bool {
self.has
}
}

impl ExprVisitor for TimestamptzExprFinder {
fn visit_function_call(&mut self, func_call: &FunctionCall) {
if func_call.return_type() == DataType::Timestamptz {
self.has = true;
return;
}

for input in &func_call.inputs {
if input.return_type() == DataType::Timestamptz {
self.has = true;
return;
}
}

func_call
.inputs()
.iter()
.for_each(|expr| self.visit_expr(expr));
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ impl LogicalOptimizer {
}

pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
// If now() and proctime() are no found, bail out.
// If now() and proctime() are not found, bail out.
let mut v = NowProcTimeFinder::default();
plan.visit_exprs_recursive(&mut v);
if !v.has() {
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ use self::plan_visitor::{has_batch_exchange, CardinalityVisitor};
use self::property::{Cardinality, RequiredDist};
use self::rule::*;
use crate::catalog::table_catalog::{TableType, TableVersion};
use crate::expr::TimestamptzExprFinder;
use crate::optimizer::plan_node::generic::Union;
use crate::optimizer::plan_node::{
BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion,
ToStream,
ToStream, VisitExprsRecursive,
};
use crate::optimizer::plan_visitor::TemporalJoinValidator;
use crate::optimizer::property::Distribution;
Expand Down Expand Up @@ -739,8 +740,13 @@ fn const_eval_exprs(plan: PlanRef) -> Result<PlanRef> {
}

fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result<PlanRef> {
let plan = plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut());
Ok(plan)
let mut v = TimestamptzExprFinder::default();
plan.visit_exprs_recursive(&mut v);
if v.has() {
Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
} else {
Ok(plan)
}
}

fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool {
Expand Down
3 changes: 3 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
MetaOpts {
enable_recovery: !config.meta.disable_recovery,
enable_scale_in_when_recovery: config.meta.enable_scale_in_when_recovery,
enable_automatic_parallelism_control: config
.meta
.enable_automatic_parallelism_control,
in_flight_barrier_nums,
max_idle_ms,
compaction_deterministic_test: config.meta.enable_compaction_deterministic,
Expand Down
4 changes: 4 additions & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,10 @@ pub async fn start_service_as_election_leader(
Duration::from_secs(1),
));
sub_tasks.push(GlobalBarrierManager::start(barrier_manager));

if env.opts.enable_automatic_parallelism_control {
sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
}
}
let (idle_send, idle_recv) = tokio::sync::oneshot::channel();
sub_tasks.push(IdleManager::start_idle_checker(
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ pub struct MetaOpts {
pub enable_recovery: bool,
/// Whether to enable the scale-in feature when compute-node is removed.
pub enable_scale_in_when_recovery: bool,
/// Whether to enable the auto-scaling feature when compute-node is joined.
/// The semantics of this configuration will be expanded in the future to control the automatic scaling of the entire cluster.
pub enable_automatic_parallelism_control: bool,
/// The maximum number of barriers in-flight in the compute nodes.
pub in_flight_barrier_nums: usize,
/// After specified seconds of idle (no mview or flush), the process will be exited.
Expand Down Expand Up @@ -186,6 +189,7 @@ impl MetaOpts {
Self {
enable_recovery,
enable_scale_in_when_recovery: false,
enable_automatic_parallelism_control: false,
in_flight_barrier_nums: 40,
max_idle_ms: 0,
compaction_deterministic_test: false,
Expand Down
Loading

0 comments on commit 0492995

Please sign in to comment.