Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jul 9, 2024
2 parents 64f72f9 + 607a2af commit 451f2f1
Show file tree
Hide file tree
Showing 52 changed files with 234 additions and 784 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ license = "Apache-2.0"
repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = { version = "0.10.0", features = ["nightly", "mtrace"] }
foyer = { version = "0.10.1", features = ["nightly", "mtrace"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/docker-hdfs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ BUILDKITE_COMMIT="HDFS_$(echo $RANDOM | md5sum | head -c 20;)"

java_home_path=$(uname -m)
if [ "$arch" = "arm64" ] || [ "$arch" = "aarch64" ]; then
java_home_path="/usr/lib/jvm/java-11-openjdk-arm64"
java_home_path="/usr/lib/jvm/java-17-openjdk-arm64"
else
# x86_64
java_home_path="/usr/lib/jvm/java-11-openjdk-amd64"
java_home_path="/usr/lib/jvm/java-17-openjdk-amd64"
fi
echo $java_home_path

Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

20 changes: 8 additions & 12 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,21 +1232,17 @@ def section_streaming_actors(outer_panels):
),
],
),
panels.timeseries_actor_latency(
"Executor Barrier Align",
panels.timeseries_percentage(
"Executor Barrier Align Per Second",
"",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('stream_barrier_align_duration_bucket')}[$__rate_interval])) by (le, executor, fragment_id, wait_side, {COMPONENT_LABEL}))",
f"p{legend} - executor {{{{executor}}}} fragment {{{{fragment_id}}}} {{{{wait_side}}}} - {{{{{COMPONENT_LABEL}}}}}",
),
[90, 99, 999, "max"],
),
panels.target(
f"sum by(le, executor, fragment_id, wait_side, job)(rate({metric('stream_barrier_align_duration_sum')}[$__rate_interval])) / sum by(le,executor,fragment_id,wait_side,{COMPONENT_LABEL}) (rate({metric('stream_barrier_align_duration_count')}[$__rate_interval])) > 0",
"avg - executor {{executor}} fragment {{fragment_id}} {{wait_side}} - {{%s}}"
% COMPONENT_LABEL,
f"avg(rate({metric('stream_barrier_align_duration_ns')}[$__rate_interval]) / 1000000000) by (fragment_id,wait_side, executor)",
"fragment {{fragment_id}} {{wait_side}} {{executor}}",
),
panels.target_hidden(
f"rate({metric('stream_barrier_align_duration_ns')}[$__rate_interval]) / 1000000000",
"actor {{actor_id}} fragment {{fragment_id}} {{wait_side}} {{executor}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

19 changes: 4 additions & 15 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -296,27 +296,14 @@ profile:
port: 15690
dashboard-port: 15691
exporter-port: 11250
meta-backend: etcd
- use: meta-node
port: 25690
dashboard-port: 25691
exporter-port: 21250
meta-backend: etcd
- use: compactor

3meta:
steps:
- use: meta-node
port: 5690
dashboard-port: 5691
exporter-port: 1250
- use: meta-node
port: 15690
dashboard-port: 15691
exporter-port: 11250
- use: meta-node
port: 25690
dashboard-port: 25691
exporter-port: 21250

3etcd-3meta-1cn-1fe:
steps:
- use: minio
Expand Down Expand Up @@ -344,10 +331,12 @@ profile:
port: 15690
dashboard-port: 15691
exporter-port: 11250
meta-backend: etcd
- use: meta-node
port: 25690
dashboard-port: 25691
exporter-port: 21250
meta-backend: etcd
- use: compactor
- use: compute-node
- use: frontend
Expand Down
10 changes: 2 additions & 8 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,12 @@ impl WorkerNodeManager {

let guard = self.inner.read().unwrap();

let worker_slot_index: HashMap<_, _> = guard
.worker_nodes
.iter()
.flat_map(|worker| {
(0..worker.parallelism()).map(move |i| (WorkerSlotId::new(worker.id, i), worker))
})
.collect();
let worker_index: HashMap<_, _> = guard.worker_nodes.iter().map(|w| (w.id, w)).collect();

let mut workers = Vec::with_capacity(worker_slot_ids.len());

for worker_slot_id in worker_slot_ids {
match worker_slot_index.get(worker_slot_id) {
match worker_index.get(&worker_slot_id.worker_id()) {
Some(worker) => workers.push((*worker).clone()),
None => bail!(
"No worker node found for worker slot id: {}",
Expand Down
2 changes: 1 addition & 1 deletion src/common/common_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] }
tonic = { workspace = true }
tower = { version = "0.4", features = ["util", "load-shed"] }
tower-http = { version = "0.5", features = ["add-extension"] }
tower-http = { version = "0.5", features = ["add-extension", "compression-gzip"] }
tracing = "0.1"

[target.'cfg(not(madsim))'.dependencies]
Expand Down
41 changes: 22 additions & 19 deletions src/common/common_service/src/metrics_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use thiserror_ext::AsReport;
use tokio::net::TcpListener;
use tower_http::add_extension::AddExtensionLayer;
use tower_http::compression::CompressionLayer;
use tracing::{error, info, warn};

pub struct MetricsManager {}
Expand All @@ -31,28 +32,30 @@ impl MetricsManager {
pub fn boot_metrics_service(listen_addr: String) {
static METRICS_SERVICE_LISTEN_ADDR: OnceLock<String> = OnceLock::new();
let new_listen_addr = listen_addr.clone();
let current_listen_addr =
METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| {
let listen_addr_clone = listen_addr.clone();
#[cfg(not(madsim))] // no need in simulation test
tokio::spawn(async move {
info!(
"Prometheus listener for Prometheus is set up on http://{}",
listen_addr
);
let current_listen_addr = METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| {
let listen_addr_clone = listen_addr.clone();
#[cfg(not(madsim))] // no need in simulation test
tokio::spawn(async move {
info!(
"Prometheus listener for Prometheus is set up on http://{}",
listen_addr
);

let service = Router::new().fallback(Self::metrics_service).layer(
AddExtensionLayer::new(GLOBAL_METRICS_REGISTRY.deref().clone()),
);
let service = Router::new()
.fallback(Self::metrics_service)
.layer(AddExtensionLayer::new(
GLOBAL_METRICS_REGISTRY.deref().clone(),
))
.layer(CompressionLayer::new());

let serve_future =
axum::serve(TcpListener::bind(&listen_addr).await.unwrap(), service);
if let Err(err) = serve_future.await {
error!(error = %err.as_report(), "metrics service exited with error");
}
});
listen_addr_clone
let serve_future =
axum::serve(TcpListener::bind(&listen_addr).await.unwrap(), service);
if let Err(err) = serve_future.await {
error!(error = %err.as_report(), "metrics service exited with error");
}
});
listen_addr_clone
});
if new_listen_addr != *current_listen_addr {
warn!(
"unable to listen port {} for metrics service. Currently listening on {}",
Expand Down
30 changes: 30 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,9 @@ pub struct FileCacheConfig {
#[serde(default = "default::file_cache::compression")]
pub compression: String,

#[serde(default = "default::file_cache::flush_buffer_threshold_mb")]
pub flush_buffer_threshold_mb: Option<usize>,

#[serde(default, flatten)]
#[config_doc(omitted)]
pub unrecognized: Unrecognized<Self>,
Expand Down Expand Up @@ -1591,6 +1594,14 @@ pub mod default {
pub fn table_info_statistic_history_times() -> usize {
240
}

pub fn block_file_cache_flush_buffer_threshold_mb() -> usize {
256
}

pub fn meta_file_cache_flush_buffer_threshold_mb() -> usize {
64
}
}

pub mod streaming {
Expand Down Expand Up @@ -1651,6 +1662,10 @@ pub mod default {
pub fn compression() -> String {
"none".to_string()
}

pub fn flush_buffer_threshold_mb() -> Option<usize> {
None
}
}

pub mod cache_refill {
Expand Down Expand Up @@ -2123,6 +2138,8 @@ pub struct StorageMemoryConfig {
pub prefetch_buffer_capacity_mb: usize,
pub block_cache_eviction_config: EvictionConfig,
pub meta_cache_eviction_config: EvictionConfig,
pub block_file_cache_flush_buffer_threshold_mb: usize,
pub meta_file_cache_flush_buffer_threshold_mb: usize,
}

pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
Expand Down Expand Up @@ -2234,6 +2251,17 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
}
});

let block_file_cache_flush_buffer_threshold_mb = s
.storage
.data_file_cache
.flush_buffer_threshold_mb
.unwrap_or(default::storage::block_file_cache_flush_buffer_threshold_mb());
let meta_file_cache_flush_buffer_threshold_mb = s
.storage
.meta_file_cache
.flush_buffer_threshold_mb
.unwrap_or(default::storage::block_file_cache_flush_buffer_threshold_mb());

StorageMemoryConfig {
block_cache_capacity_mb,
block_cache_shard_num,
Expand All @@ -2244,6 +2272,8 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
prefetch_buffer_capacity_mb,
block_cache_eviction_config,
meta_cache_eviction_config,
block_file_cache_flush_buffer_threshold_mb,
meta_file_cache_flush_buffer_threshold_mb,
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::util::iter_util::ZipEqDebug;
// TODO: find a better place for this.
pub type ActorId = u32;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct WorkerSlotId(u64);

impl WorkerSlotId {
Expand Down Expand Up @@ -68,6 +68,12 @@ impl Display for WorkerSlotId {
}
}

impl Debug for WorkerSlotId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
}
}

/// Trait for items that can be used as keys in [`VnodeMapping`].
pub trait VnodeMappingItem {
/// The type of the item.
Expand Down
20 changes: 20 additions & 0 deletions src/compute/src/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,24 @@ pub fn storage_memory_config(
((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize) >> 20,
);

// The file cache flush buffer threshold is used as a emergency limitation.
// On most cases the flush buffer is not supposed to be as large as the threshold.
// So, the file cache flush buffer threshold size is not calculated in the memory usage.
let block_file_cache_flush_buffer_threshold_mb = storage_config
.data_file_cache
.flush_buffer_threshold_mb
.unwrap_or(
risingwave_common::config::default::storage::block_file_cache_flush_buffer_threshold_mb(
),
);
let meta_file_cache_flush_buffer_threshold_mb = storage_config
.meta_file_cache
.flush_buffer_threshold_mb
.unwrap_or(
risingwave_common::config::default::storage::meta_file_cache_flush_buffer_threshold_mb(
),
);

let total_calculated_mb = block_cache_capacity_mb
+ meta_cache_capacity_mb
+ shared_buffer_capacity_mb
Expand Down Expand Up @@ -276,6 +294,8 @@ pub fn storage_memory_config(
prefetch_buffer_capacity_mb,
block_cache_eviction_config,
meta_cache_eviction_config,
block_file_cache_flush_buffer_threshold_mb,
meta_file_cache_flush_buffer_threshold_mb,
}
}

Expand Down
1 change: 0 additions & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ opendal = { version = "0.47", features = [
] }
openssl = "0.10"
parking_lot = { workspace = true }
parquet = { workspace = true }
paste = "1"
pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" }
postgres-openssl = "0.5.0"
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
// report to promethus
GLOBAL_SOURCE_METRICS
.direct_cdc_event_lag_latency
.with_label_values(&[&msg_meta.full_table_name])
.with_guarded_label_values(&[&msg_meta.full_table_name])
.observe(lag_ms as f64);
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl<T: CdcSourceTypeTrait> CdcSplitReader<T> {
tracing::trace!("receive {} cdc events ", events.len());
metrics
.connector_source_rows_received
.with_label_values(&[source_type.as_str_name(), &source_id])
.with_guarded_label_values(&[source_type.as_str_name(), &source_id])
.inc_by(events.len() as u64);
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
Expand Down
Loading

0 comments on commit 451f2f1

Please sign in to comment.