Skip to content

Commit

Permalink
Merge branch 'main' into kwannoel/sink-return
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Nov 28, 2023
2 parents 1c8ab05 + edfb9b9 commit b133901
Show file tree
Hide file tree
Showing 15 changed files with 188 additions and 86 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pkill python3

sqllogictest -p 4566 -d dev './e2e_test/udf/alter_function.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/graceful_shutdown_python.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/retry_python.slt'

echo "--- e2e, $mode, java udf"
java -jar risingwave-udf-example.jar &
Expand Down
35 changes: 35 additions & 0 deletions e2e_test/udf/retry_python.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
system ok
python3 e2e_test/udf/test.py &

# wait for server to start
sleep 1s

statement ok
CREATE FUNCTION sleep(INT) RETURNS INT AS 'sleep' USING LINK 'http://localhost:8815';

# restart the server
system ok
pkill -9 python && python3 e2e_test/udf/test.py &

# query should not be affected
query I
select sleep(0);
----
0

# restart the server after 1s
system ok
sleep 1 && pkill -9 python && python3 e2e_test/udf/test.py &

# query should not be affected
query I
select sleep(2);
----
0

# close the server
system ok
pkill python

statement ok
DROP FUNCTION sleep;
4 changes: 2 additions & 2 deletions integration_tests/clickhouse-sink/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CREATE TABLE user_behaviors (
user_id INT,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
event_timestamp TIMESTAMPTZ,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
Expand All @@ -15,4 +15,4 @@ CREATE TABLE user_behaviors (
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '50'
) FORMAT PLAIN ENCODE JSON;
) FORMAT PLAIN ENCODE JSON;
1 change: 1 addition & 0 deletions src/common/heap_profiling/src/jeprof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub enum JeprofError {
},
}

/// Run `jeprof --collapsed` on the given profile.
pub async fn run(profile_path: String, collapsed_path: String) -> Result<(), JeprofError> {
let executable_path = env::current_exe()?;

Expand Down
93 changes: 38 additions & 55 deletions src/common/heap_profiling/src/profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ use std::path::Path;

use parking_lot::Once;
use risingwave_common::config::HeapProfilingConfig;
use tikv_jemalloc_ctl::{
epoch as jemalloc_epoch, opt as jemalloc_opt, prof as jemalloc_prof, stats as jemalloc_stats,
};
use risingwave_common::util::resource_util;
use tikv_jemalloc_ctl::{opt as jemalloc_opt, prof as jemalloc_prof};
use tokio::time::{self, Duration};

use super::AUTO_DUMP_SUFFIX;

/// `HeapProfiler` automatically triggers heap profiling when memory usage is higher than the threshold.
///
/// To use it, both jemalloc's `opt.prof` and RisingWave's config `heap_profiling.enable_auto` must be set to true.
pub struct HeapProfiler {
config: HeapProfilingConfig,
threshold_auto_dump_heap_profile: usize,
jemalloc_dump_mib: jemalloc_prof::dump_mib,
jemalloc_allocated_mib: jemalloc_stats::allocated_mib,
jemalloc_epoch_mib: tikv_jemalloc_ctl::epoch_mib,
/// If jemalloc profiling is enabled
opt_prof: bool,
}
Expand All @@ -39,84 +39,67 @@ impl HeapProfiler {
/// # Arguments
///
/// `total_memory` must be the total available memory for the process.
/// It will be compared with jemalloc's allocated memory.
/// It will be compared with the process resident memory.
pub fn new(total_memory: usize, config: HeapProfilingConfig) -> Self {
let threshold_auto_dump_heap_profile =
(total_memory as f64 * config.threshold_auto as f64) as usize;
let jemalloc_dump_mib = jemalloc_prof::dump::mib().unwrap();
let jemalloc_allocated_mib = jemalloc_stats::allocated::mib().unwrap();
let jemalloc_epoch_mib = jemalloc_epoch::mib().unwrap();
let opt_prof = jemalloc_opt::prof::read().unwrap();

Self {
config,
threshold_auto_dump_heap_profile,
jemalloc_dump_mib,
jemalloc_allocated_mib,
jemalloc_epoch_mib,
opt_prof,
}
}

fn dump_heap_prof(&self, cur_used_memory_bytes: usize, prev_used_memory_bytes: usize) {
if !self.config.enable_auto {
return;
}

if cur_used_memory_bytes > self.threshold_auto_dump_heap_profile
&& prev_used_memory_bytes <= self.threshold_auto_dump_heap_profile
{
if !self.opt_prof {
tracing::info!("Cannot dump heap profile because Jemalloc prof is not enabled");
return;
}
fn auto_dump_heap_prof(&self) {
let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
let file_name = format!("{}.{}", time_prefix, AUTO_DUMP_SUFFIX);

let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
let file_name = format!("{}.{}", time_prefix, AUTO_DUMP_SUFFIX);
let file_path = Path::new(&self.config.dir)
.join(&file_name)
.to_str()
.expect("file path is not valid utf8")
.to_owned();
let file_path_c = CString::new(file_path).expect("0 byte in file path");

let file_path = Path::new(&self.config.dir)
.join(&file_name)
.to_str()
.expect("file path is not valid utf8")
.to_owned();
let file_path_c = CString::new(file_path).expect("0 byte in file path");

// FIXME(yuhao): `unsafe` here because `jemalloc_dump_mib.write` requires static lifetime
if let Err(e) = self
.jemalloc_dump_mib
.write(unsafe { &*(file_path_c.as_c_str() as *const _) })
{
tracing::warn!("Auto Jemalloc dump heap file failed! {:?}", e);
} else {
tracing::info!("Successfully dumped heap profile to {}", file_name);
}
// FIXME(yuhao): `unsafe` here because `jemalloc_dump_mib.write` requires static lifetime
if let Err(e) = self
.jemalloc_dump_mib
.write(unsafe { &*(file_path_c.as_c_str() as *const _) })
{
tracing::warn!("Auto Jemalloc dump heap file failed! {:?}", e);
} else {
tracing::info!("Successfully dumped heap profile to {}", file_name);
}
}

fn advance_jemalloc_epoch(&self, prev_jemalloc_allocated_bytes: usize) -> usize {
if let Err(e) = self.jemalloc_epoch_mib.advance() {
tracing::warn!("Jemalloc epoch advance failed! {:?}", e);
/// Start the daemon task of auto heap profiling.
pub fn start(self) {
if !self.config.enable_auto || !self.opt_prof {
tracing::info!("Auto memory dump is disabled.");
return;
}

self.jemalloc_allocated_mib.read().unwrap_or_else(|e| {
tracing::warn!("Jemalloc read allocated failed! {:?}", e);
prev_jemalloc_allocated_bytes
})
}

pub fn start(self) {
static START: Once = Once::new();
START.call_once(|| {
fs::create_dir_all(&self.config.dir).unwrap();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_millis(500));
let mut prev_jemalloc_allocated_bytes = 0;
let mut prev_used_memory_bytes = 0;
loop {
interval.tick().await;
let jemalloc_allocated_bytes =
self.advance_jemalloc_epoch(prev_jemalloc_allocated_bytes);
self.dump_heap_prof(jemalloc_allocated_bytes, prev_jemalloc_allocated_bytes);
prev_jemalloc_allocated_bytes = jemalloc_allocated_bytes;
let cur_used_memory_bytes = resource_util::memory::total_memory_used_bytes();

// Dump heap profile when memory usage is crossing the threshold.
if cur_used_memory_bytes > self.threshold_auto_dump_heap_profile
&& prev_used_memory_bytes <= self.threshold_auto_dump_heap_profile
{
self.auto_dump_heap_prof();
}
prev_used_memory_bytes = cur_used_memory_bytes;
}
});
})
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

pub mod connection;
pub mod my_stats;
pub mod process_linux;
pub mod process;
pub mod rwlock;

use std::sync::LazyLock;
Expand All @@ -25,7 +25,7 @@ use prometheus::core::{
use prometheus::{Histogram, HistogramVec, Registry};

use crate::monitor::my_stats::MyHistogram;
use crate::monitor::process_linux::monitor_process;
use crate::monitor::process::monitor_process;

#[cfg(target_os = "linux")]
static PAGESIZE: std::sync::LazyLock<i64> =
Expand Down
File renamed without changes.
35 changes: 24 additions & 11 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,12 @@ impl ClickHouseSink {
risingwave_common::types::DataType::Time => Err(SinkError::ClickHouse(
"clickhouse can not support Time".to_string(),
)),
risingwave_common::types::DataType::Timestamp => {
risingwave_common::types::DataType::Timestamp => Err(SinkError::ClickHouse(
"clickhouse does not have a type corresponding to naive timestamp".to_string(),
)),
risingwave_common::types::DataType::Timestamptz => {
Ok(ck_column.r#type.contains("DateTime64"))
}
risingwave_common::types::DataType::Timestamptz => Err(SinkError::ClickHouse(
"clickhouse can not support Timestamptz".to_string(),
)),
risingwave_common::types::DataType::Interval => Err(SinkError::ClickHouse(
"clickhouse can not support Interval".to_string(),
)),
Expand Down Expand Up @@ -422,6 +422,7 @@ impl ClickHouseSinkWriter {
/// `column_correct_vec`
fn build_column_correct_vec(ck_column: &SystemColumn) -> Result<ClickHouseSchemaFeature> {
let can_null = ck_column.r#type.contains("Nullable");
// `DateTime64` without precision is already displayed as `DateTime(3)` in `system.columns`.
let accuracy_time = if ck_column.r#type.contains("DateTime64(") {
ck_column
.r#type
Expand All @@ -431,6 +432,9 @@ impl ClickHouseSinkWriter {
.split(')')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.split(',')
.next()
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?
} else {
Expand Down Expand Up @@ -696,16 +700,25 @@ impl ClickHouseFieldWithNull {
"clickhouse can not support Time".to_string(),
))
}
ScalarRefImpl::Timestamp(v) => {
let time = v.get_timestamp_nanos()
/ 10_i32.pow((9 - clickhouse_schema_feature.accuracy_time).into()) as i64;
ClickHouseField::Int64(time)
}
ScalarRefImpl::Timestamptz(_) => {
ScalarRefImpl::Timestamp(_) => {
return Err(SinkError::ClickHouse(
"clickhouse can not support Timestamptz".to_string(),
"clickhouse does not have a type corresponding to naive timestamp".to_string(),
))
}
ScalarRefImpl::Timestamptz(v) => {
let micros = v.timestamp_micros();
let ticks = match clickhouse_schema_feature.accuracy_time <= 6 {
true => {
micros / 10_i64.pow((6 - clickhouse_schema_feature.accuracy_time).into())
}
false => micros
.checked_mul(
10_i64.pow((clickhouse_schema_feature.accuracy_time - 6).into()),
)
.ok_or_else(|| SinkError::ClickHouse("DateTime64 overflow".to_string()))?,
};
ClickHouseField::Int64(ticks)
}
ScalarRefImpl::Jsonb(_) => {
return Err(SinkError::ClickHouse(
"clickhouse rust interface can not support Json".to_string(),
Expand Down
22 changes: 21 additions & 1 deletion src/ctl/src/cmd_impl/scale/resize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::ops::Sub;
use std::process::exit;

use inquire::Confirm;
Expand Down Expand Up @@ -60,6 +61,7 @@ impl From<ScaleHorizonCommands> for ScaleCommandContext {
yes,
fragments,
target_parallelism_per_worker: None,
exclusive_for_vertical: false,
}
}
}
Expand All @@ -76,6 +78,7 @@ impl From<ScaleVerticalCommands> for ScaleCommandContext {
yes,
fragments,
},
exclusive,
} = value;

Self {
Expand All @@ -87,6 +90,7 @@ impl From<ScaleVerticalCommands> for ScaleCommandContext {
yes,
fragments,
target_parallelism_per_worker,
exclusive_for_vertical: exclusive,
}
}
}
Expand All @@ -100,6 +104,7 @@ pub struct ScaleCommandContext {
yes: bool,
fragments: Option<Vec<u32>>,
target_parallelism_per_worker: Option<u32>,
exclusive_for_vertical: bool,
}

pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> anyhow::Result<()> {
Expand Down Expand Up @@ -191,10 +196,11 @@ pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> any
output,
yes,
fragments,
exclusive_for_vertical,
} = scale_ctx;

let worker_changes = {
let exclude_worker_ids =
let mut exclude_worker_ids =
worker_input_to_worker_ids(exclude_workers.unwrap_or_default(), false);
let include_worker_ids =
worker_input_to_worker_ids(include_workers.unwrap_or_default(), true);
Expand Down Expand Up @@ -231,6 +237,20 @@ pub async fn resize(ctl_ctx: &CtlContext, scale_ctx: ScaleCommandContext) -> any
}
}

if exclusive_for_vertical {
let all_worker_ids: HashSet<_> =
streaming_workers_index_by_id.keys().cloned().collect();

let include_worker_id_set: HashSet<_> = include_worker_ids.iter().cloned().collect();
let generated_exclude_worker_ids = all_worker_ids.sub(&include_worker_id_set);

exclude_worker_ids = exclude_worker_ids
.into_iter()
.chain(generated_exclude_worker_ids)
.unique()
.collect();
}

WorkerChanges {
include_worker_ids,
exclude_worker_ids,
Expand Down
4 changes: 4 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ pub struct ScaleVerticalCommands {
/// The target parallelism per worker, requires `workers` to be set.
#[clap(long, required = true)]
target_parallelism_per_worker: Option<u32>,

/// It will exclude all other workers to maintain the target parallelism only for the target workers.
#[clap(long, default_value_t = false)]
exclusive: bool,
}

#[derive(Subcommand, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl UdfExpression {

let output = self
.client
.call(&self.identifier, input)
.call_with_retry(&self.identifier, input)
.instrument_await(self.span.clone())
.await?;
if output.num_rows() != vis.count_ones() {
Expand Down
Loading

0 comments on commit b133901

Please sign in to comment.