Skip to content

Commit

Permalink
Merge branch 'main' into bz/project-set-error-fix-again
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao authored Nov 14, 2024
2 parents fc24982 + a0ff192 commit 1e85b92
Show file tree
Hide file tree
Showing 37 changed files with 1,050 additions and 557 deletions.
223 changes: 70 additions & 153 deletions dashboard/package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dashboard/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"eslint-plugin-n": "^15.2.5",
"eslint-plugin-promise": "^6.0.1",
"eslint-plugin-react": "^7.31.6",
"express": "^4.20.0",
"express": "^4.21.1",
"prettier": "^2.7.1",
"prettier-plugin-organize-imports": "^3.1.1",
"typescript": "5.4.2"
Expand Down
4 changes: 2 additions & 2 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,8 @@ message EventLog {
string error = 3;
}
message EventCollectBarrierFail {
uint64 prev_epoch = 1;
uint64 cur_epoch = 2;
reserved 1, 2;
reserved "prev_epoch", "cur_epoch";
string error = 3;
}
message EventWorkerNodePanic {
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,12 @@ pub struct StorageConfig {
#[serde(default = "default::storage::compactor_max_overlap_sst_count")]
pub compactor_max_overlap_sst_count: usize,

/// The maximum number of meta files that can be preloaded.
/// If the number of meta files exceeds this value, the compactor will try to compute parallelism only through `SstableInfo`, no longer preloading `SstableMeta`.
/// This is to prevent the compactor from consuming too much memory, but it may cause the compactor to be less efficient.
#[serde(default = "default::storage::compactor_max_preload_meta_file_count")]
pub compactor_max_preload_meta_file_count: usize,

/// Object storage configuration
/// 1. General configuration
/// 2. Some special configuration of Backend
Expand Down Expand Up @@ -1795,6 +1801,10 @@ pub mod default {
64
}

pub fn compactor_max_preload_meta_file_count() -> usize {
32
}

// deprecated
pub fn table_info_statistic_history_times() -> usize {
240
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ This page is automatically generated by `./risedev generate-example-config`
| compactor_fast_max_compact_task_size | | 2147483648 |
| compactor_iter_max_io_retry_times | | 8 |
| compactor_max_overlap_sst_count | | 64 |
| compactor_max_preload_meta_file_count | The maximum number of meta files that can be preloaded. If the number of meta files exceeds this value, the compactor will try to compute parallelism only through `SstableInfo`, no longer preloading `SstableMeta`. This is to prevent the compactor from consuming too much memory, but it may cause the compactor to be less efficient. | 32 |
| compactor_max_sst_key_count | | 2097152 |
| compactor_max_sst_size | | 536870912 |
| compactor_max_task_multiplier | Compactor calculates the maximum number of tasks that can be executed on the node based on `worker_num` and `compactor_max_task_multiplier`. `max_pull_task_count` = `worker_num` * `compactor_max_task_multiplier` | 3.0 |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ compactor_iter_max_io_retry_times = 8
table_info_statistic_history_times = 240
mem_table_spill_threshold = 4194304
compactor_max_overlap_sst_count = 64
compactor_max_preload_meta_file_count = 32
time_travel_version_cache_capacity = 32

[storage.cache.block_cache_eviction]
Expand Down
24 changes: 18 additions & 6 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ impl IcebergCommon {
"org.apache.iceberg.aws.s3.S3FileIO".to_string(),
);

// suppress log of S3FileIO like: Unclosed S3FileIO instance created by...
java_catalog_configs
.insert("init-creation-stacktrace".to_string(), "false".to_string());

if let Some(endpoint) = &self.endpoint {
java_catalog_configs
.insert("s3.endpoint".to_string(), endpoint.clone().to_string());
Expand Down Expand Up @@ -258,7 +262,6 @@ mod v1 {

let catalog_type = self.catalog_type().to_string();

iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone());
iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name());

match catalog_type.as_str() {
Expand All @@ -267,14 +270,16 @@ mod v1 {
format!("iceberg.catalog.{}.warehouse", self.catalog_name()),
self.warehouse_path.clone(),
);
iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".into());
}
"rest" => {
"rest_rust" => {
let uri = self
.catalog_uri
.clone()
.with_context(|| "`catalog.uri` must be set in rest catalog".to_string())?;
iceberg_configs
.insert(format!("iceberg.catalog.{}.uri", self.catalog_name()), uri);
iceberg_configs.insert(CATALOG_TYPE.to_string(), "rest".into());
}
_ => {
bail!(
Expand Down Expand Up @@ -351,15 +356,16 @@ mod v1 {
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<CatalogRef> {
match self.catalog_type() {
"storage" | "rest" => {
"storage" | "rest_rust" => {
let iceberg_configs = self.build_iceberg_configs()?;
let catalog = load_catalog(&iceberg_configs).await?;
Ok(catalog)
}
catalog_type
if catalog_type == "hive"
|| catalog_type == "jdbc"
|| catalog_type == "glue" =>
|| catalog_type == "glue"
|| catalog_type == "rest" =>
{
// Create java catalog
let (base_catalog_config, java_catalog_props) =
Expand All @@ -368,6 +374,7 @@ mod v1 {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
"glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
"rest" => "org.apache.iceberg.rest.RESTCatalog",
_ => unreachable!(),
};

Expand Down Expand Up @@ -444,7 +451,7 @@ mod v2 {
let catalog = storage_catalog::StorageCatalog::new(config)?;
Ok(Arc::new(catalog))
}
"rest" => {
"rest_rust" => {
let mut iceberg_configs = HashMap::new();
if let Some(region) = &self.region {
iceberg_configs.insert(S3_REGION.to_string(), region.clone().to_string());
Expand Down Expand Up @@ -512,13 +519,18 @@ mod v2 {
let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?;
Ok(Arc::new(catalog))
}
catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => {
catalog_type
if catalog_type == "hive"
|| catalog_type == "jdbc"
|| catalog_type == "rest" =>
{
// Create java catalog
let (base_catalog_config, java_catalog_props) =
self.build_jni_catalog_configs(java_catalog_props)?;
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
"rest" => "org.apache.iceberg.rest.RESTCatalog",
_ => unreachable!(),
};

Expand Down
88 changes: 88 additions & 0 deletions src/expr/impl/src/aggregate/first_last_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{Datum, ScalarRefImpl};
use risingwave_common_estimate_size::EstimateSize;
use risingwave_expr::aggregate;
use risingwave_expr::aggregate::AggStateDyn;

/// Note that different from `min` and `max`, `first_value` doesn't ignore `NULL` values.
///
/// ```slt
/// statement ok
/// create table t(v1 int, ts int);
///
/// statement ok
/// insert into t values (null, 1), (2, 2), (null, 3);
///
/// query I
/// select first_value(v1 order by ts) from t;
/// ----
/// NULL
///
/// statement ok
/// drop table t;
/// ```
#[aggregate("first_value(any) -> any")]
fn first_value(state: &mut FirstValueState, input: Option<ScalarRefImpl<'_>>) {
if state.0.is_none() {
state.0 = Some(input.map(|x| x.into_scalar_impl()));
}
}

#[derive(Debug, Clone, Default, EstimateSize)]
struct FirstValueState(Option<Datum>);

impl AggStateDyn for FirstValueState {}

impl From<&FirstValueState> for Datum {
fn from(state: &FirstValueState) -> Self {
if let Some(state) = &state.0 {
state.clone()
} else {
None
}
}
}

/// Note that different from `min` and `max`, `last_value` doesn't ignore `NULL` values.
///
/// ```slt
/// statement ok
/// create table t(v1 int, ts int);
///
/// statement ok
/// insert into t values (null, 1), (2, 2), (null, 3);
///
/// query I
/// select last_value(v1 order by ts) from t;
/// ----
/// NULL
///
/// statement ok
/// drop table t;
/// ```
#[aggregate("last_value(*) -> auto", state = "ref")] // TODO(rc): `last_value(any) -> any`
fn last_value<T>(_: Option<T>, input: Option<T>) -> Option<T> {
input
}

#[aggregate("internal_last_seen_value(*) -> auto", state = "ref", internal)]
fn internal_last_seen_value<T>(state: T, input: T, retract: bool) -> T {
if retract {
state
} else {
input
}
}
19 changes: 0 additions & 19 deletions src/expr/impl/src/aggregate/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,6 @@ fn max<T: Ord>(state: T, input: T) -> T {
state.max(input)
}

#[aggregate("first_value(*) -> auto", state = "ref")]
fn first_value<T>(state: T, _: T) -> T {
state
}

#[aggregate("last_value(*) -> auto", state = "ref")]
fn last_value<T>(_: T, input: T) -> T {
input
}

#[aggregate("internal_last_seen_value(*) -> auto", state = "ref", internal)]
fn internal_last_seen_value<T>(state: T, input: T, retract: bool) -> T {
if retract {
state
} else {
input
}
}

/// Note the following corner cases:
///
/// ```slt
Expand Down
1 change: 1 addition & 0 deletions src/expr/impl/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod bit_or;
mod bit_xor;
mod bool_and;
mod bool_or;
mod first_last_value;
mod general;
mod jsonb_agg;
mod mode;
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,10 @@
update t set a = a + 1;
expected_outputs:
- batch_distributed_plan
- name: update table with subquery in the set clause
sql: |
create table t1 (v1 int primary key, v2 int);
create table t2 (v1 int primary key, v2 int);
update t1 set v1 = (select v1 from t2 where t1.v2 = t2.v2);
expected_outputs:
- binder_error
6 changes: 6 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,9 @@
└─BatchUpdate { table: t, exprs: [($0 + 1:Int32), $1, $2] }
└─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id) }
└─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) }
- name: update table with subquery in the set clause
sql: |
create table t1 (v1 int primary key, v2 int);
create table t2 (v1 int primary key, v2 int);
update t1 set v1 = (select v1 from t2 where t1.v2 = t2.v2);
binder_error: 'Bind error: subquery on the right side of assignment is unsupported'
13 changes: 7 additions & 6 deletions src/frontend/src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::collections::{BTreeMap, HashMap};

use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{Schema, TableVersionId};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_sqlparser::ast::{Assignment, AssignmentValue, Expr, ObjectName, SelectItem};
Expand Down Expand Up @@ -129,15 +128,17 @@ impl Binder {
for Assignment { id, value } in assignments {
// FIXME: Parsing of `id` is not strict. It will even treat `a.b` as `(a, b)`.
let assignments = match (id.as_slice(), value) {
// _ = (subquery)
(_ids, AssignmentValue::Expr(Expr::Subquery(_))) => {
return Err(ErrorCode::BindError(
"subquery on the right side of assignment is unsupported".to_owned(),
)
.into())
}
// col = expr
([id], value) => {
vec![(id.clone(), value)]
}

// (col1, col2) = (subquery)
(_ids, AssignmentValue::Expr(Expr::Subquery(_))) => {
bail_not_implemented!("subquery on the right side of multi-assignment");
}
// (col1, col2) = (expr1, expr2)
// TODO: support `DEFAULT` in multiple assignments
(ids, AssignmentValue::Expr(Expr::Row(values))) if ids.len() == values.len() => id
Expand Down
Loading

0 comments on commit 1e85b92

Please sign in to comment.