Skip to content

Commit

Permalink
Merge branch 'main' into xxh/bench-sink
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jan 4, 2024
2 parents 73f543f + 58e1326 commit 3fdd3e9
Show file tree
Hide file tree
Showing 46 changed files with 583 additions and 166 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions e2e_test/source/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: missing field `properties.bootstrap.server`
2: failed to create source worker
3: missing field `properties.bootstrap.server`


statement error
Expand All @@ -37,7 +38,8 @@ db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: Unknown fields in the WITH clause: {"unknown_field": "1"}
2: failed to create source worker
3: Unknown fields in the WITH clause: {"unknown_field": "1"}


statement error topic invalid_topic not found
Expand Down
30 changes: 14 additions & 16 deletions e2e_test/streaming/over_window/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@ set rw_streaming_over_window_cache_policy = full;

include ./generated/main.slt.part

# TODO(rc): The following tests are temporarily commented out because of recovery test failure.

#statement ok
#set rw_streaming_over_window_cache_policy = recent;
#
#include ./generated/main.slt.part
#
#statement ok
#set rw_streaming_over_window_cache_policy = recent_first_n;
#
#include ./generated/main.slt.part
#
#statement ok
#set rw_streaming_over_window_cache_policy = recent_last_n;
#
#include ./generated/main.slt.part
statement ok
set rw_streaming_over_window_cache_policy = recent;

include ./generated/main.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent_first_n;

include ./generated/main.slt.part

statement ok
set rw_streaming_over_window_cache_policy = recent_last_n;

include ./generated/main.slt.part

statement ok
set rw_streaming_over_window_cache_policy = default;
8 changes: 7 additions & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,13 @@ def section_compaction(outer_panels):
[
panels.target(
f"avg({metric('storage_compact_task_pending_num')}) by({COMPONENT_LABEL}, {NODE_LABEL})",
"compactor_task_split_count - {{%s}} @ {{%s}}"
"compactor_task_count - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),

panels.target(
f"avg({metric('storage_compact_task_pending_parallelism')}) by({COMPONENT_LABEL}, {NODE_LABEL})",
"compactor_task_pending_parallelism - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ message CompactTask {
PENDING = 1;
SUCCESS = 2;
HEARTBEAT_CANCELED = 3;
NO_AVAIL_RESOURCE_CANCELED = 4;
NO_AVAIL_MEMORY_RESOURCE_CANCELED = 4;
ASSIGN_FAIL_CANCELED = 5;
SEND_FAIL_CANCELED = 6;
MANUAL_CANCELED = 7;
Expand All @@ -294,6 +294,7 @@ message CompactTask {
EXECUTE_FAILED = 10;
JOIN_HANDLE_FAILED = 11;
TRACK_SST_OBJECT_ID_FAILED = 12;
NO_AVAIL_CPU_RESOURCE_CANCELED = 13;
}
// SSTs to be compacted, which will be removed from LSM after compaction
repeated InputLevel input_ssts = 1;
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub use update::*;
pub use utils::*;
pub use values::*;

use self::test_utils::{BlockExecutorBuidler, BusyLoopExecutorBuidler};
use self::test_utils::{BlockExecutorBuilder, BusyLoopExecutorBuilder};
use crate::error::Result;
use crate::executor::sys_row_seq_scan::SysRowSeqScanExecutorBuilder;
use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
Expand Down Expand Up @@ -235,8 +235,8 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::SortOverWindow => SortOverWindowExecutor,
NodeBody::MaxOneRow => MaxOneRowExecutor,
// Follow NodeBody only used for test
NodeBody::BlockExecutor => BlockExecutorBuidler,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuidler,
NodeBody::BlockExecutor => BlockExecutorBuilder,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder,
}
.await?;

Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,10 @@ impl LookupExecutorBuilder for FakeInnerSideExecutorBuilder {
}
}

pub struct BlockExecutorBuidler {}
pub struct BlockExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for BlockExecutorBuidler {
impl BoxedExecutorBuilder for BlockExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
_source: &ExecutorBuilder<'_, C>,
_inputs: Vec<BoxedExecutor>,
Expand Down Expand Up @@ -374,10 +374,10 @@ impl BlockExecutor {
}
}

pub struct BusyLoopExecutorBuidler {}
pub struct BusyLoopExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for BusyLoopExecutorBuidler {
impl BoxedExecutorBuilder for BusyLoopExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
_source: &ExecutorBuilder<'_, C>,
_inputs: Vec<BoxedExecutor>,
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::executor::{
};
use crate::task::BatchTaskContext;

/// [`UpdateExecutor`] implements table updation with values from its child executor and given
/// [`UpdateExecutor`] implements table update with values from its child executor and given
/// expressions.
// Note: multiple `UPDATE`s in a single epoch, or concurrent `UPDATE`s may lead to conflicting
// records. This is validated and filtered on the first `Materialize`.
Expand Down
2 changes: 1 addition & 1 deletion src/common/proc_macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub fn derive_estimate_size(input: TokenStream) -> TokenStream {
// that we can manipulate
let ast: syn::DeriveInput = syn::parse(input).unwrap();

// The name of the sruct.
// The name of the struct.
let name = &ast.ident;

// Extract all generics we shall ignore.
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ pub mod default {
}

pub fn compactor_max_task_multiplier() -> f32 {
1.5000
2.5000
}

pub fn compactor_memory_available_proportion() -> f64 {
Expand Down
13 changes: 11 additions & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,26 @@ pub async fn compute_node_serve(
let memory_limiter = Arc::new(MemoryLimiter::new(
storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
));

let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
let max_task_parallelism = Arc::new(AtomicU32::new(
(compaction_executor.worker_num() as f32
* storage_opts.compactor_max_task_multiplier)
.ceil() as u32,
));

let compactor_context = CompactorContext {
storage_opts,
sstable_store: storage.sstable_store(),
compactor_metrics: compactor_metrics.clone(),
is_share_buffer_compact: false,
compaction_executor: Arc::new(CompactionExecutor::new(Some(1))),
compaction_executor,
memory_limiter,

task_progress_manager: Default::default(),
await_tree_reg: None,
running_task_count: Arc::new(AtomicU32::new(0)),
running_task_parallelism: Arc::new(AtomicU32::new(0)),
max_task_parallelism,
};

let (handle, shutdown_sender) = start_compactor(
Expand Down
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ imm_merge_threshold = 4
write_conflict_detection_enabled = true
disable_remote_compactor = false
share_buffer_upload_concurrency = 8
compactor_max_task_multiplier = 1.5
compactor_max_task_multiplier = 2.5
compactor_memory_available_proportion = 0.8
sstable_id_remote_fetch_number = 10
min_sst_size_for_streaming_upload = 33554432
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub struct DorisSinkWriter {
pub config: DorisConfig,
schema: Schema,
pk_indices: Vec<usize>,
inseter_inner_builder: InserterInnerBuilder,
inserter_inner_builder: InserterInnerBuilder,
is_append_only: bool,
client: Option<DorisClient>,
row_encoder: JsonEncoder,
Expand Down Expand Up @@ -290,7 +290,7 @@ impl DorisSinkWriter {
config,
schema: schema.clone(),
pk_indices,
inseter_inner_builder: doris_insert_builder,
inserter_inner_builder: doris_insert_builder,
is_append_only,
client: None,
row_encoder: JsonEncoder::new_with_doris(
Expand Down Expand Up @@ -379,7 +379,7 @@ impl DorisSinkWriter {
impl SinkWriter for DorisSinkWriter {
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
if self.client.is_none() {
self.client = Some(DorisClient::new(self.inseter_inner_builder.build().await?));
self.client = Some(DorisClient::new(self.inserter_inner_builder.build().await?));
}
if self.is_append_only {
self.append_only(chunk).await
Expand Down
39 changes: 18 additions & 21 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::ops::Deref;
use std::pin::pin;
use std::time::Instant;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use futures::future::select;
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -149,11 +149,12 @@ impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
}

async fn validate(&self) -> Result<()> {
validate_remote_sink(&self.param).await
validate_remote_sink(&self.param).await?;
Ok(())
}
}

async fn validate_remote_sink(param: &SinkParam) -> Result<()> {
async fn validate_remote_sink(param: &SinkParam) -> anyhow::Result<()> {
// FIXME: support struct and array in stream sink
param.columns.iter().map(|col| {
if matches!(
Expand Down Expand Up @@ -189,43 +190,36 @@ async fn validate_remote_sink(param: &SinkParam) -> Result<()> {
let sink_param = param.to_proto();

spawn_blocking(move || {
let mut env = jvm
.attach_current_thread()
.map_err(|err| SinkError::Internal(err.into()))?;
let mut env = jvm.attach_current_thread()?;
let validate_sink_request = ValidateSinkRequest {
sink_param: Some(sink_param),
};
let validate_sink_request_bytes = env
.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))
.map_err(|err| SinkError::Internal(err.into()))?;
let validate_sink_request_bytes =
env.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))?;

let validate_sink_response_bytes = call_static_method!(
env,
{com.risingwave.connector.JniSinkValidationHandler},
{byte[] validate(byte[] validateSourceRequestBytes)},
&validate_sink_request_bytes
)
.map_err(|err| SinkError::Internal(err.into()))?;
)?;

let validate_sink_response: ValidateSinkResponse = Message::decode(
risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, &mut env)
.map_err(|err| SinkError::Internal(err.into()))?
.deref(),
)
.map_err(|err| SinkError::Internal(err.into()))?;
risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, &mut env)?.deref(),
)?;

validate_sink_response.error.map_or_else(
|| Ok(()), // If there is no error message, return Ok here.
|err| {
Err(SinkError::Remote(anyhow!(format!(
Err(anyhow!(format!(
"sink cannot pass validation: {}",
err.error_message
))))
)))
},
)
})
.await
.map_err(|e| anyhow!("unable to validate: {:?}", e))?
.context("JoinHandle returns error")?
}

pub struct RemoteLogSinker {
Expand Down Expand Up @@ -457,7 +451,8 @@ impl<R: RemoteSinkTrait> Sink for CoordinatedRemoteSink<R> {
const SINK_NAME: &'static str = R::SINK_NAME;

async fn validate(&self) -> Result<()> {
validate_remote_sink(&self.param).await
validate_remote_sink(&self.param).await?;
Ok(())
}

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Expand Down Expand Up @@ -632,7 +627,9 @@ struct EmbeddedConnectorClient {

impl EmbeddedConnectorClient {
fn new() -> Result<Self> {
let jvm = JVM.get_or_init()?;
let jvm = JVM
.get_or_init()
.context("failed to create EmbeddedConnectorClient")?;
Ok(EmbeddedConnectorClient { jvm })
}

Expand Down
4 changes: 1 addition & 3 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
let source_type = conn_props.get_source_type_pb();
let (mut tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

let jvm = JVM
.get_or_init()
.map_err(|e| anyhow!("jvm not initialized properly: {:?}", e))?;
let jvm = JVM.get_or_init()?;

let get_event_stream_request = GetEventStreamRequest {
source_id,
Expand Down
33 changes: 19 additions & 14 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,30 @@ pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
println!("Table #{}", table_fragment.table_id);

for fragment in table_fragment.fragments.values() {
if fragment.fragment_type_mask & FragmentTypeFlag::Source as u32 == 0 {
let fragment_type_mask = fragment.fragment_type_mask;
if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
|| fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0
{
// skip dummy source for dml fragment
continue;
}

println!("\tFragment #{}", fragment.fragment_id);
for actor in &fragment.actors {
let ConnectorSplits { splits } = actor_splits.remove(&actor.actor_id).unwrap();
let splits = splits
.iter()
.map(|split| SplitImpl::try_from(split).unwrap())
.map(|split| split.id())
.collect_vec();

println!(
"\t\tActor #{:<3} ({}): [{}]",
actor.actor_id,
splits.len(),
splits.join(",")
);
if let Some(ConnectorSplits { splits }) = actor_splits.remove(&actor.actor_id) {
let splits = splits
.iter()
.map(|split| SplitImpl::try_from(split).unwrap())
.map(|split| split.id())
.collect_vec();

println!(
"\t\tActor #{:<3} ({}): [{}]",
actor.actor_id,
splits.len(),
splits.join(",")
);
}
}
}
}
Expand Down
Loading

0 comments on commit 3fdd3e9

Please sign in to comment.