Skip to content

Commit

Permalink
feat(tracing): support minitrace + jaeger (risingwavelabs#6601)
Browse files Browse the repository at this point in the history
* feat(tracing): support minitrace + jaeger

Signed-off-by: Bugen Zhao <[email protected]>

* fix simulation test

Signed-off-by: Bugen Zhao <[email protected]>

* flip branch

Signed-off-by: Bugen Zhao <[email protected]>

* fix

Signed-off-by: Bugen Zhao <[email protected]>

Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
BugenZhao and mergify[bot] authored Nov 29, 2022
1 parent 1db4737 commit a061017
Show file tree
Hide file tree
Showing 21 changed files with 140 additions and 67 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions src/cmd/src/bin/compute_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ fn main() {

let opts = risingwave_compute::ComputeNodeOpts::parse();

risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(
opts.enable_jaeger_tracing,
false,
));
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(false));

risingwave_rt::main_okk(risingwave_compute::start(opts))
}
5 changes: 1 addition & 4 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ fn main() -> Result<()> {

let opts = risingwave_compute::ComputeNodeOpts::parse_from(args);

risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(
opts.enable_jaeger_tracing,
false,
));
risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(false));

risingwave_rt::main_okk(risingwave_compute::start(opts));

Expand Down
10 changes: 10 additions & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ pub async fn compute_node_serve(
state_store_metrics.clone(),
object_store_metrics,
TieredCacheMetricsBuilder::new(registry.clone()),
if opts.enable_jaeger_tracing {
Arc::new(
risingwave_tracing::RwTracingService::new(risingwave_tracing::TracingConfig::new(
"127.0.0.1:6831".to_string(),
))
.unwrap(),
)
} else {
Arc::new(risingwave_tracing::RwTracingService::disabled())
},
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ risingwave_pb = { path = "../prost" }
risingwave_rpc_client = { path = "../rpc_client" }
risingwave_storage = { path = "../storage" }
risingwave_stream = { path = "../stream" }
risingwave_tracing = { path = "../tracing" }
serde_json = "1"
size = "0.4"
tokio = { version = "0.2", package = "madsim-tokio", features = [
Expand Down
1 change: 1 addition & 0 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ risectl requires a full persistent cluster to operate. Please make sure you're n
metrics.state_store_metrics.clone(),
metrics.object_store_metrics.clone(),
TieredCacheMetricsBuilder::unused(),
Arc::new(risingwave_tracing::RwTracingService::disabled()),
)
.await?;

Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ pub async fn register_leader_for_meta<S: MetaStore>(

pub async fn rpc_serve_with_store<S: MetaStore>(
meta_store: Arc<S>,
mut address_info: AddressInfo,
address_info: AddressInfo,
max_heartbeat_interval: Duration,
lease_interval_secs: u64,
opts: MetaOpts,
Expand Down Expand Up @@ -336,9 +336,9 @@ pub async fn rpc_serve_with_store<S: MetaStore>(
);

#[cfg(not(madsim))]
if let Some(dashboard_addr) = address_info.dashboard_addr.take() {
if let Some(ref dashboard_addr) = address_info.dashboard_addr {
let dashboard_service = crate::dashboard::DashboardService {
dashboard_addr,
dashboard_addr: *dashboard_addr,
cluster_manager: cluster_manager.clone(),
fragment_manager: fragment_manager.clone(),
meta_store: env.meta_store_ref(),
Expand Down
2 changes: 1 addition & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ risingwave_hummock_sdk = { path = "../storage/hummock_sdk" }
risingwave_object_store = { path = "../object_store" }
risingwave_pb = { path = "../prost" }
risingwave_rpc_client = { path = "../rpc_client" }
risingwave_tracing = { path = "../tracing" }
scopeguard = "1"
# rocksdb = { git = "https://github.com/tikv/rust-rocksdb.git", rev = "fa83ff19", features = [
# "encryption",
Expand Down Expand Up @@ -83,7 +84,6 @@ twox-hash = "1"
zstd = "0.11.2"

[target.'cfg(not(madsim))'.dependencies]
risingwave_tracing = { path = "../tracing" }
workspace-hack = { path = "../workspace-hack" }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async fn get_local_hummock_storage(
event_tx.clone(),
MemoryLimiter::unlimit(),
#[cfg(not(madsim))]
Arc::new(risingwave_tracing::RwTracingService::new()),
Arc::new(risingwave_tracing::RwTracingService::disabled()),
)
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_test/src/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ async fn test_snapshot_backward_range_scan_inner(enable_sync: bool, enable_commi
mock_hummock_meta_client.clone(),
get_test_notification_client(env, hummock_manager_ref, worker_node),
Arc::new(StateStoreMetrics::unused()),
Arc::new(risingwave_tracing::RwTracingService::disabled()),
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ async fn test_reload_storage() {
worker_node.clone(),
),
Arc::new(StateStoreMetrics::unused()),
Arc::new(risingwave_tracing::RwTracingService::disabled()),
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_test/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ pub(crate) async fn with_hummock_storage_v1() -> (HummockStorageV1, Arc<MockHumm
meta_client.clone(),
get_test_notification_client(env, hummock_manager_ref, worker_node),
Arc::new(StateStoreMetrics::unused()),
Arc::new(risingwave_tracing::RwTracingService::disabled()),
)
.await
.unwrap();
Expand Down
14 changes: 3 additions & 11 deletions src/storage/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ pub struct HummockStorage {

read_version_mapping: Arc<ReadVersionMappingType>,

#[cfg(not(madsim))]
tracing: Arc<risingwave_tracing::RwTracingService>,
}

Expand All @@ -148,6 +147,7 @@ impl HummockStorage {
notification_client: impl NotificationClient,
// TODO: separate `HummockStats` from `StateStoreMetrics`.
stats: Arc<StateStoreMetrics>,
tracing: Arc<risingwave_tracing::RwTracingService>,
) -> HummockResult<Self> {
let sstable_id_manager = Arc::new(SstableIdManager::new(
hummock_meta_client.clone(),
Expand Down Expand Up @@ -176,9 +176,6 @@ impl HummockStorage {
hummock_meta_client.clone(),
));

#[cfg(not(madsim))]
let tracing = Arc::new(risingwave_tracing::RwTracingService::new());

let compactor_context = Arc::new(Context::new_local_compact_context(
options.clone(),
sstable_store.clone(),
Expand Down Expand Up @@ -207,7 +204,6 @@ impl HummockStorage {
shutdown_sender: event_tx,
}),
read_version_mapping: hummock_event_handler.read_version_mapping(),
#[cfg(not(madsim))]
tracing,
};

Expand All @@ -233,7 +229,6 @@ impl HummockStorage {
self.hummock_version_reader.clone(),
self.hummock_event_sender.clone(),
self.buffer_tracker.get_memory_limiter().clone(),
#[cfg(not(madsim))]
self.tracing.clone(),
)
}
Expand Down Expand Up @@ -308,6 +303,7 @@ impl HummockStorage {
hummock_meta_client,
notification_client,
Arc::new(StateStoreMetrics::unused()),
Arc::new(risingwave_tracing::RwTracingService::disabled()),
)
.await
}
Expand Down Expand Up @@ -472,7 +468,6 @@ pub struct HummockStorageV1 {

version_update_notifier_tx: Arc<tokio::sync::watch::Sender<HummockEpoch>>,

#[cfg(not(madsim))]
tracing: Arc<risingwave_tracing::RwTracingService>,
}

Expand All @@ -485,6 +480,7 @@ impl HummockStorageV1 {
notification_client: impl NotificationClient,
// TODO: separate `HummockStats` from `StateStoreMetrics`.
stats: Arc<StateStoreMetrics>,
tracing: Arc<risingwave_tracing::RwTracingService>,
) -> HummockResult<Self> {
// For conflict key detection. Enabled by setting `write_conflict_detection_enabled` to
// true in `StorageConfig`
Expand Down Expand Up @@ -515,9 +511,6 @@ impl HummockStorageV1 {
hummock_meta_client.clone(),
));

#[cfg(not(madsim))]
let tracing = Arc::new(risingwave_tracing::RwTracingService::new());

let compactor_context = Arc::new(Context::new_local_compact_context(
options.clone(),
sstable_store.clone(),
Expand Down Expand Up @@ -571,7 +564,6 @@ impl HummockStorageV1 {
}),
version_update_notifier_tx: epoch_update_tx_clone,
hummock_event_sender: event_tx,
#[cfg(not(madsim))]
tracing,
};

Expand Down
5 changes: 1 addition & 4 deletions src/storage/src/hummock/state_store_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,7 @@ impl StateStoreRead for HummockStorageV1 {

let iter =
self.iter_inner::<ForwardIter>(epoch, map_table_key_range(key_range), read_options);
#[cfg(not(madsim))]
return iter.in_span(self.tracing.new_tracer("hummock_iter"));
#[cfg(madsim)]
iter
iter.in_span(self.tracing.new_tracer("hummock_iter"))
}
}

Expand Down
12 changes: 2 additions & 10 deletions src/storage/src/hummock/store/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::ops::Bound;
use std::sync::Arc;

use bytes::Bytes;
#[cfg(not(madsim))]
use minitrace::future::FutureExt;
use parking_lot::RwLock;
use risingwave_common::catalog::TableId;
Expand Down Expand Up @@ -65,8 +64,6 @@ pub struct HummockStorageCore {

pub struct LocalHummockStorage {
core: Arc<HummockStorageCore>,

#[cfg(not(madsim))]
tracing: Arc<risingwave_tracing::RwTracingService>,
}

Expand All @@ -76,7 +73,6 @@ impl Clone for LocalHummockStorage {
fn clone(&self) -> Self {
Self {
core: self.core.clone(),
#[cfg(not(madsim))]
tracing: self.tracing.clone(),
}
}
Expand Down Expand Up @@ -169,10 +165,7 @@ impl StateStoreRead for LocalHummockStorage {
let iter = self
.core
.iter_inner(map_table_key_range(key_range), epoch, read_options);
#[cfg(not(madsim))]
return iter.in_span(self.tracing.new_tracer("hummock_iter"));
#[cfg(madsim)]
iter
iter.in_span(self.tracing.new_tracer("hummock_iter"))
}
}

Expand Down Expand Up @@ -225,7 +218,7 @@ impl LocalHummockStorage {
hummock_version_reader: HummockVersionReader,
event_sender: mpsc::UnboundedSender<HummockEvent>,
memory_limiter: Arc<MemoryLimiter>,
#[cfg(not(madsim))] tracing: Arc<risingwave_tracing::RwTracingService>,
tracing: Arc<risingwave_tracing::RwTracingService>,
) -> Self {
let storage_core = HummockStorageCore::new(
instance_guard,
Expand All @@ -237,7 +230,6 @@ impl LocalHummockStorage {

Self {
core: Arc::new(storage_core),
#[cfg(not(madsim))]
tracing,
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ pub mod verify {

impl StateStoreImpl {
#[cfg_attr(not(target_os = "linux"), expect(unused_variables))]
#[allow(clippy::too_many_arguments)]
pub async fn new(
s: &str,
file_cache_dir: &str,
Expand All @@ -445,6 +446,7 @@ impl StateStoreImpl {
state_store_stats: Arc<StateStoreMetrics>,
object_store_metrics: Arc<ObjectStoreMetrics>,
tiered_cache_metrics_builder: TieredCacheMetricsBuilder,
tracing: Arc<risingwave_tracing::RwTracingService>,
) -> StorageResult<Self> {
#[cfg(not(target_os = "linux"))]
let tiered_cache = TieredCache::none();
Expand Down Expand Up @@ -512,6 +514,7 @@ impl StateStoreImpl {
hummock_meta_client.clone(),
notification_client,
state_store_stats.clone(),
tracing,
)
.await?;

Expand All @@ -523,6 +526,7 @@ impl StateStoreImpl {
hummock_meta_client.clone(),
notification_client,
state_store_stats.clone(),
tracing,
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions src/tests/compaction_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ risingwave_pb = { path = "../../prost" }
risingwave_rpc_client = { path = "../../rpc_client" }
risingwave_rt = { path = "../../utils/runtime" }
risingwave_storage = { path = "../../storage", features = ["test"] }
risingwave_tracing = { path = "../../tracing" }
serde = { version = "1", features = ["derive"] }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
Expand Down
1 change: 1 addition & 0 deletions src/tests/compaction_test/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ pub async fn create_hummock_store_with_metrics(
metrics.state_store_metrics.clone(),
metrics.object_store_metrics.clone(),
TieredCacheMetricsBuilder::unused(),
Arc::new(risingwave_tracing::RwTracingService::disabled()),
)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions src/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ repository = { workspace = true }
anyhow = "1"
futures = { version = "0.3", default-features = false, features = ["alloc", "executor"] }
minitrace = "0.4"
minitrace-jaeger = "0.4"
rand = "0.8"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"sync",
"macros",
Expand Down
Loading

0 comments on commit a061017

Please sign in to comment.