Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_fix_compactor_memory_limit
  • Loading branch information
Li0k committed Feb 4, 2024
2 parents 8a538c1 + f7b5a15 commit afa5ad3
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 194 deletions.
166 changes: 84 additions & 82 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ arrow-flight = "50"
arrow-select = "50"
arrow-ord = "50"
arrow-row = "50"
arrow-udf-js = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "7ba1c22" }
arrow-udf-wasm = "0.1"
arrow-udf-js = "0.1"
arrow-udf-wasm = { version = "0.1.2", features = ["build"] }
arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" }
arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" }
arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" }
Expand Down
1 change: 1 addition & 0 deletions ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ RUN curl -sSL https://install.python-poetry.org | python3 -

# add required rustup components
RUN rustup component add rustfmt llvm-tools-preview clippy
RUN rustup target add wasm32-wasi

ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse

Expand Down
2 changes: 1 addition & 1 deletion ci/build-ci-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cat ../rust-toolchain
# shellcheck disable=SC2155

# REMEMBER TO ALSO UPDATE ci/docker-compose.yml
export BUILD_ENV_VERSION=v20240124_1
export BUILD_ENV_VERSION=v20240204

export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}"

Expand Down
10 changes: 5 additions & 5 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ services:
retries: 5

source-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
depends_on:
- mysql
- db
Expand All @@ -81,7 +81,7 @@ services:
- ..:/risingwave

sink-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
depends_on:
- mysql
- db
Expand All @@ -93,12 +93,12 @@ services:
- ..:/risingwave

rw-build-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
volumes:
- ..:/risingwave

ci-flamegraph-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
# NOTE(kwannoel): This is used in order to permit
# syscalls for `nperf` (perf_event_open),
# so it can do CPU profiling.
Expand All @@ -109,7 +109,7 @@ services:
- ..:/risingwave

regress-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
depends_on:
db:
condition: service_healthy
Expand Down
1 change: 0 additions & 1 deletion ci/scripts/build-other.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ source ci/scripts/common.sh

echo "--- Build Rust UDF"
cd e2e_test/udf/wasm
rustup target add wasm32-wasi
cargo build --release
cd ../../..

Expand Down
3 changes: 2 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ COPY rust-toolchain rust-toolchain
RUN rustup self update \
&& rustup set profile minimal \
&& rustup show \
&& rustup component add rustfmt
&& rustup component add rustfmt \
&& rustup target add wasm32-wasi

RUN cargo install flamegraph
# TODO: cargo-chef doesn't work well now, because we update Cargo.lock very often.
Expand Down
3 changes: 2 additions & 1 deletion docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ RUN unzip dashboard-artifact.zip && mv risingwave-dashboard-artifact /risingwave
RUN rustup self update \
&& rustup set profile minimal \
&& rustup show \
&& rustup component add rustfmt
&& rustup component add rustfmt \
&& rustup target add wasm32-wasi

RUN cargo fetch

Expand Down
21 changes: 21 additions & 0 deletions e2e_test/udf/wasm_udf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,24 @@ drop function jsonb_access;

statement ok
drop function series;

# inlined rust function
statement ok
create function gcd(int, int) returns int language rust as $$
fn gcd(mut a: i32, mut b: i32) -> i32 {
while b != 0 {
let t = b;
b = a % b;
a = t;
}
a
}
$$;

query I
select gcd(25, 15);
----
5

statement ok
drop function gcd;
2 changes: 1 addition & 1 deletion src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl Build for UserDefinedFunction {

let identifier = udf.get_identifier()?;
let imp = match udf.language.as_str() {
"wasm" => {
"wasm" | "rust" => {
let link = udf.get_link()?;
// Use `block_in_place` as an escape hatch to run async code here in sync context.
// Calling `block_on` directly will panic.
Expand Down
70 changes: 54 additions & 16 deletions src/frontend/src/handler/create_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use risingwave_expr::expr::get_or_create_wasm_runtime;
use risingwave_object_store::object::{build_remote_object_store, ObjectStoreConfig};
use risingwave_pb::catalog::function::{Kind, ScalarFunction, TableFunction};
use risingwave_pb::catalog::Function;
use risingwave_sqlparser::ast::{
CreateFunctionBody, FunctionDefinition, ObjectName, OperateFunctionArg,
};
use risingwave_sqlparser::ast::{CreateFunctionBody, ObjectName, OperateFunctionArg};
use risingwave_storage::monitor::ObjectStoreMetrics;
use risingwave_udf::ArrowFlightUdfClient;

Expand Down Expand Up @@ -54,7 +52,7 @@ pub async fn handle_create_function(
Some(lang) => {
let lang = lang.real_value().to_lowercase();
match &*lang {
"python" | "java" | "wasm" | "javascript" => lang,
"python" | "java" | "wasm" | "rust" | "javascript" => lang,
_ => {
return Err(ErrorCode::InvalidParameterValue(format!(
"language {} is not supported",
Expand Down Expand Up @@ -134,12 +132,12 @@ pub async fn handle_create_function(
)
.into());
};
let Some(FunctionDefinition::SingleQuotedDef(id)) = params.as_ else {
let Some(as_) = params.as_ else {
return Err(
ErrorCode::InvalidParameterValue("AS must be specified".to_string()).into(),
);
};
identifier = id;
identifier = as_.into_string();

// check UDF server
{
Expand Down Expand Up @@ -173,16 +171,56 @@ pub async fn handle_create_function(
}
"javascript" => {
identifier = function_name.to_string();
body = Some(match params.as_ {
Some(FunctionDefinition::SingleQuotedDef(s)) => s,
Some(FunctionDefinition::DoubleDollarDef(s)) => s,
_ => {
return Err(ErrorCode::InvalidParameterValue(
"AS must be specified".to_string(),
)
.into())
}
});
body = Some(
params
.as_
.ok_or_else(|| ErrorCode::InvalidParameterValue("AS must be specified".into()))?
.into_string(),
);
}
"rust" => {
identifier = wasm_identifier(
&function_name,
&arg_types,
&return_type,
matches!(kind, Kind::Table(_)),
);
if params.using.is_some() {
return Err(ErrorCode::InvalidParameterValue(
"USING is not supported for rust function".to_string(),
)
.into());
}
let function_body = params
.as_
.ok_or_else(|| ErrorCode::InvalidParameterValue("AS must be specified".into()))?
.into_string();
let script = format!("#[arrow_udf::function(\"{identifier}\")]\n{function_body}");
body = Some(function_body.clone());

let wasm_binary =
tokio::task::spawn_blocking(move || arrow_udf_wasm::build::build("", &script))
.await?
.context("failed to build rust function")?;

// below is the same as `wasm` language
let runtime = arrow_udf_wasm::Runtime::new(&wasm_binary)?;
check_wasm_function(&runtime, &identifier)?;

let system_params = session.env().meta_client().get_system_params().await?;
let object_name = format!("{:?}.wasm", md5::compute(&wasm_binary));
upload_wasm_binary(
system_params.wasm_storage_url(),
&object_name,
wasm_binary.into(),
)
.await?;

link = Some(format!(
"{}/{}",
system_params.wasm_storage_url(),
object_name
));
}
"wasm" => {
identifier = wasm_identifier(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
use std::sync::Arc;

use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::CompactionConfig;
use risingwave_pb::hummock::{CompactionConfig, LevelType};

use super::{
CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker,
LocalPickerStatistic, TierCompactionPicker,
};
use crate::hummock::compaction::picker::intra_compaction_picker::WholeLevelCompactionPicker;
use crate::hummock::compaction::CompactionDeveloperConfig;
use crate::hummock::level_handler::LevelHandler;

Expand Down Expand Up @@ -50,20 +51,65 @@ impl EmergencyCompactionPicker {
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
let unused_validator = Arc::new(CompactionTaskValidator::unused());

let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
self.target_level,
self.config.clone(),
unused_validator.clone(),
self.developer_config.clone(),
);

if let Some(ret) =
base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
let l0 = levels.l0.as_ref().unwrap();
let overlapping_count = l0
.sub_levels
.iter()
.filter(|level| level.level_type == LevelType::Overlapping as i32)
.count();
let no_overlap_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count == 0
})
.count();
let partitioned_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count > 0
})
.count();
// We trigger `EmergencyCompactionPicker` only when some unexpected condition cause the number of l0 levels increase and the origin strategy
// can not compact those data to lower level. But if most of these levels are overlapping level, it is dangerous to compact small data of non-overlapping sub level
// to base level, it will cost a lot of compactor resource because of large write-amplification.
if (self.config.split_weight_by_vnode == 0 && no_overlap_count > overlapping_count)
|| (self.config.split_weight_by_vnode > 0
&& partitioned_count > no_overlap_count
&& partitioned_count > overlapping_count)
{
return Some(ret);
let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
self.target_level,
self.config.clone(),
unused_validator.clone(),
self.developer_config.clone(),
);

if let Some(ret) =
base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
{
return Some(ret);
}
}
if self.config.split_weight_by_vnode > 0
&& no_overlap_count > partitioned_count
&& no_overlap_count > overlapping_count
{
let intral_level_compaction_picker =
WholeLevelCompactionPicker::new(self.config.clone(), unused_validator.clone());

if let Some(ret) = intral_level_compaction_picker.pick_whole_level(
levels.l0.as_ref().unwrap(),
&level_handlers[0],
self.config.split_weight_by_vnode,
stats,
) {
return Some(ret);
}
}
let mut tier_compaction_picker =
TierCompactionPicker::new_with_validator(self.config.clone(), unused_validator);

Expand Down
Loading

0 comments on commit afa5ad3

Please sign in to comment.