Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into dylan/support_create_…
Browse files Browse the repository at this point in the history
…iceberg_source
  • Loading branch information
chenzl25 committed Feb 4, 2024
2 parents b5423ea + 5f741c6 commit 1cada1c
Show file tree
Hide file tree
Showing 49 changed files with 872 additions and 571 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 45 additions & 11 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ steps:
timeout_in_minutes: 12
retry: *auto-retry

- label: "build (deterministic simulation)"
- label: "build simulation test"
command: "ci/scripts/build-simulation.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
Expand Down Expand Up @@ -214,7 +214,7 @@ steps:
timeout_in_minutes: 22
retry: *auto-retry

- label: "unit test (deterministic simulation)"
- label: "unit test (madsim)"
key: "unit-test-deterministic"
command: "MADSIM_TEST_NUM=100 timeout 15m ci/scripts/deterministic-unit-test.sh"
if: |
Expand All @@ -229,7 +229,7 @@ steps:
timeout_in_minutes: 15
retry: *auto-retry

- label: "integration test (deterministic simulation) - scale"
- label: "integration test (madsim) - scale"
key: "integration-test-deterministic-scale"
command: "TEST_NUM=60 ci/scripts/deterministic-it-test.sh scale::"
if: |
Expand All @@ -246,7 +246,7 @@ steps:
timeout_in_minutes: 70
retry: *auto-retry

- label: "integration test (deterministic simulation) - recovery"
- label: "integration test (madsim) - recovery"
key: "integration-test-deterministic-recovery"
command: "TEST_NUM=60 ci/scripts/deterministic-it-test.sh recovery::"
if: |
Expand All @@ -263,9 +263,9 @@ steps:
timeout_in_minutes: 70
retry: *auto-retry

- label: "integration test (deterministic simulation) - others"
key: "integration-test-deterministic-others"
command: "TEST_NUM=30 ci/scripts/deterministic-it-test.sh backfill_tests:: storage:: sink::"
- label: "integration test (madsim) - backfill"
key: "integration-test-deterministic-backfill"
command: "TEST_NUM=30 ci/scripts/deterministic-it-test.sh backfill_tests::"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-integration-test-deterministic-simulation"
Expand All @@ -280,7 +280,41 @@ steps:
timeout_in_minutes: 70
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
- label: "integration test (madsim) - storage"
key: "integration-test-deterministic-storage"
command: "TEST_NUM=30 ci/scripts/deterministic-it-test.sh storage::"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-integration-test-deterministic-simulation"
|| build.env("CI_STEPS") =~ /(^|,)integration-tests?-deterministic-simulation(,|$$)/
depends_on: "build-simulation"
plugins:
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 70
retry: *auto-retry

- label: "integration test (madsim) - sink"
key: "integration-test-deterministic-sink"
command: "TEST_NUM=30 ci/scripts/deterministic-it-test.sh sink::"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-integration-test-deterministic-simulation"
|| build.env("CI_STEPS") =~ /(^|,)integration-tests?-deterministic-simulation(,|$$)/
depends_on: "build-simulation"
plugins:
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 70
retry: *auto-retry

- label: "end-to-end test (madsim)"
key: "e2e-test-deterministic"
command: "TEST_NUM=64 timeout 55m ci/scripts/deterministic-e2e-test.sh"
if: |
Expand All @@ -302,7 +336,7 @@ steps:
timeout_in_minutes: 60
retry: *auto-retry

- label: "recovery test (deterministic simulation)"
- label: "recovery test (madsim)"
key: "recovery-test-deterministic"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 55m ci/scripts/deterministic-recovery-test.sh"
if: |
Expand All @@ -321,7 +355,7 @@ steps:
retry: *auto-retry

# Ddl statements will randomly run with background_ddl.
- label: "background_ddl, arrangement_backfill recovery test (deterministic simulation)"
- label: "background_ddl, arrangement_backfill recovery test (madsim)"
key: "background-ddl-arrangement-backfill-recovery-test-deterministic"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=--use-arrangement-backfill timeout 55m ci/scripts/deterministic-recovery-test.sh"
if: |
Expand All @@ -340,7 +374,7 @@ steps:
retry: *auto-retry

# Ddl statements will randomly run with background_ddl.
- label: "background_ddl recovery test (deterministic simulation)"
- label: "background_ddl recovery test (madsim)"
key: "background-ddl-recovery-test-deterministic"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 timeout 55m ci/scripts/deterministic-recovery-test.sh"
if: |
Expand Down
2 changes: 1 addition & 1 deletion dashboard/next.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* @type {import('next').NextConfig}
*/
const nextConfig = {
output: 'export',
output: "export",
trailingSlash: true,
}

Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3678,6 +3678,26 @@ def section_memory_manager(outer_panels):
),
],
),
panels.timeseries_memory(
"The resident memory of jemalloc",
"",
[
panels.target(
f"{metric('jemalloc_resident_bytes')}",
"",
),
],
),
panels.timeseries_memory(
"The metadata memory of jemalloc",
"",
[
panels.target(
f"{metric('jemalloc_metadata_bytes')}",
"",
),
],
),
panels.timeseries_memory(
"The allocated memory of jvm",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts
compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
}
if let Some(meta_opts) = meta_opts.as_mut() {
meta_opts.prometheus_host = Some(prometheus_listener_addr.clone());
meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
}
}

Expand Down Expand Up @@ -246,6 +246,7 @@ mod test {

// Test parsing into node-level opts.
let actual = parse_standalone_opt_args(&opts);

check(
actual,
expect![[r#"
Expand All @@ -257,7 +258,7 @@ mod test {
listen_addr: "127.0.0.1:8001",
advertise_addr: "127.0.0.1:9999",
dashboard_host: None,
prometheus_host: Some(
prometheus_listener_addr: Some(
"127.0.0.1:1234",
),
etcd_endpoints: "",
Expand Down
2 changes: 2 additions & 0 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_ADVERTISE_ADDR")]
pub advertise_addr: Option<String>,

/// We will start a http server at this address via `MetricsManager`.
/// Then the prometheus instance will poll the metrics from this address.
#[clap(
long,
env = "RW_PROMETHEUS_LISTENER_ADDR",
Expand Down
21 changes: 18 additions & 3 deletions src/compute/src/memory/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,30 @@ impl std::fmt::Debug for LruWatermarkController {
///
/// - `stats.allocated`: Total number of bytes allocated by the application.
/// - `stats.active`: Total number of bytes in active pages allocated by the application. This is a multiple of the page size, and greater than or equal to `stats.allocated`. This does not include `stats.arenas.<i>.pdirty`, `stats.arenas.<i>.pmuzzy`, nor pages entirely devoted to allocator metadata.
/// - `stats.resident`: Total number of bytes in physically resident data pages mapped by the allocator.
/// - `stats.metadata`: Total number of bytes dedicated to jemalloc metadata.
///
/// Reference: <https://jemalloc.net/jemalloc.3.html>
fn jemalloc_memory_stats() -> (usize, usize) {
fn jemalloc_memory_stats() -> (usize, usize, usize, usize) {
if let Err(e) = tikv_jemalloc_ctl::epoch::advance() {
tracing::warn!("Jemalloc epoch advance failed! {:?}", e);
}
let allocated = tikv_jemalloc_ctl::stats::allocated::read().unwrap();
let active = tikv_jemalloc_ctl::stats::active::read().unwrap();
(allocated, active)
let resident = tikv_jemalloc_ctl::stats::resident::read().unwrap();
let metadata = tikv_jemalloc_ctl::stats::metadata::read().unwrap();
(allocated, active, resident, metadata)
}

impl LruWatermarkController {
pub fn tick(&mut self, interval_ms: u32) -> Epoch {
// NOTE: Be careful! The meaning of `allocated` and `active` differ in JeMalloc and JVM
let (jemalloc_allocated_bytes, jemalloc_active_bytes) = jemalloc_memory_stats();
let (
jemalloc_allocated_bytes,
jemalloc_active_bytes,
jemalloc_resident_bytes,
jemalloc_metadata_bytes,
) = jemalloc_memory_stats();
let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats();

let cur_used_memory_bytes = jemalloc_active_bytes + jvm_allocated_bytes;
Expand Down Expand Up @@ -188,6 +197,12 @@ impl LruWatermarkController {
self.metrics
.jemalloc_active_bytes
.set(jemalloc_active_bytes as i64);
self.metrics
.jemalloc_resident_bytes
.set(jemalloc_resident_bytes as i64);
self.metrics
.jemalloc_metadata_bytes
.set(jemalloc_metadata_bytes as i64);
self.metrics
.jvm_allocated_bytes
.set(jvm_allocated_bytes as i64);
Expand Down
15 changes: 10 additions & 5 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::source::nats::source::NatsOffset;
// The file describes the common abstractions for each connector and can be used in both source and
// sink.

pub const BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets";

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -148,10 +148,6 @@ pub struct KafkaCommon {
#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,

#[serde(rename = "broker.rewrite.endpoints")]
#[serde_as(as = "Option<JsonString>")]
pub broker_rewrite_map: Option<HashMap<String, String>>,

#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

Expand Down Expand Up @@ -224,6 +220,15 @@ pub struct KafkaCommon {
sasl_oathbearer_config: Option<String>,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaPrivateLinkCommon {
/// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
#[serde(rename = "broker.rewrite.endpoints")]
#[serde_as(as = "Option<JsonString>")]
pub broker_rewrite_map: Option<HashMap<String, String>>,
}

const fn default_kafka_sync_call_timeout() -> Duration {
Duration::from_secs(5)
}
Expand Down
16 changes: 14 additions & 2 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use with_options::WithOptions;

use super::catalog::{SinkFormat, SinkFormatDesc};
use super::{Sink, SinkError, SinkParam};
use crate::common::{KafkaCommon, RdKafkaPropertiesCommon};
use crate::common::{KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon};
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::log_store::DeliveryFutureManagerAddFuture;
Expand Down Expand Up @@ -232,6 +232,9 @@ pub struct KafkaConfig {

#[serde(flatten)]
pub rdkafka_properties_producer: RdKafkaPropertiesProducer,

#[serde(flatten)]
pub privatelink_common: KafkaPrivateLinkCommon,
}

impl KafkaConfig {
Expand Down Expand Up @@ -261,6 +264,7 @@ impl From<KafkaConfig> for KafkaProperties {
common: val.common,
rdkafka_properties_common: val.rdkafka_properties_common,
rdkafka_properties_consumer: Default::default(),
privatelink_common: val.privatelink_common,
unknown_fields: Default::default(),
}
}
Expand Down Expand Up @@ -403,7 +407,7 @@ impl KafkaSinkWriter {

// Create the producer context, will be used to create the producer
let producer_ctx = PrivateLinkProducerContext::new(
config.common.broker_rewrite_map.clone(),
config.privatelink_common.broker_rewrite_map.clone(),
// fixme: enable kafka native metrics for sink
None,
None,
Expand Down Expand Up @@ -656,13 +660,21 @@ mod test {
"properties.sasl.password".to_string() => "test".to_string(),
"properties.retry.max".to_string() => "20".to_string(),
"properties.retry.interval".to_string() => "500ms".to_string(),
// PrivateLink
"broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(),
};
let config = KafkaConfig::from_hashmap(properties).unwrap();
assert_eq!(config.common.brokers, "localhost:9092");
assert_eq!(config.common.topic, "test");
assert_eq!(config.max_retry_num, 20);
assert_eq!(config.retry_interval, Duration::from_millis(500));

// PrivateLink fields
let hashmap: HashMap<String, String> = hashmap! {
"broker1".to_string() => "10.0.0.1:8001".to_string()
};
assert_eq!(config.privatelink_common.broker_rewrite_map, Some(hashmap));

// Optional fields eliminated.
let properties: HashMap<String, String> = hashmap! {
// "connector".to_string() => "kafka".to_string(),
Expand Down
7 changes: 5 additions & 2 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,11 @@ mod tests {

let props = ConnectorProperties::extract(props, true).unwrap();
if let ConnectorProperties::Kafka(k) = props {
assert!(k.common.broker_rewrite_map.is_some());
println!("{:?}", k.common.broker_rewrite_map);
let hashmap: HashMap<String, String> = hashmap! {
"b-1:9092".to_string() => "dns-1".to_string(),
"b-2:9092".to_string() => "dns-2".to_string(),
};
assert_eq!(k.privatelink_common.broker_rewrite_map, Some(hashmap));
} else {
panic!("extract kafka config failed");
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl SplitEnumerator for KafkaSplitEnumerator {
let common_props = &properties.common;

let broker_address = common_props.brokers.clone();
let broker_rewrite_map = common_props.broker_rewrite_map.clone();
let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();
let topic = common_props.topic.clone();
config.set("bootstrap.servers", &broker_address);
config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
Expand Down
Loading

0 comments on commit 1cada1c

Please sign in to comment.