-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: batching telemetry event request avoid too many requests #20000
Changes from all commits
14ab498
9731e9c
5c6578f
015af02
15de8b9
3ed7e87
9ffe508
a50d227
e793e2c
b0274f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,13 +14,18 @@ | |
|
||
use std::sync::Arc; | ||
|
||
use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid; | ||
use risingwave_pb::telemetry::PbEventMessage; | ||
pub use risingwave_telemetry_event::{ | ||
current_timestamp, post_telemetry_report_pb, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID, | ||
current_timestamp, do_telemetry_event_report, post_telemetry_report_pb, | ||
TELEMETRY_EVENT_REPORT_INTERVAL, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID, | ||
}; | ||
use risingwave_telemetry_event::{ | ||
get_telemetry_risingwave_cloud_uuid, TELEMETRY_EVENT_REPORT_STASH_SIZE, | ||
TELEMETRY_EVENT_REPORT_TX, | ||
}; | ||
use tokio::sync::oneshot::Sender; | ||
use tokio::task::JoinHandle; | ||
use tokio::time::{interval, Duration}; | ||
use tokio::time::{interval as tokio_interval_fn, Duration}; | ||
use uuid::Uuid; | ||
|
||
use super::{Result, TELEMETRY_REPORT_INTERVAL}; | ||
|
@@ -60,9 +65,13 @@ where | |
|
||
let begin_time = std::time::Instant::now(); | ||
let session_id = Uuid::new_v4().to_string(); | ||
let mut interval = interval(Duration::from_secs(TELEMETRY_REPORT_INTERVAL)); | ||
let mut interval = tokio_interval_fn(Duration::from_secs(TELEMETRY_REPORT_INTERVAL)); | ||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||
|
||
let mut event_interval = | ||
tokio_interval_fn(Duration::from_secs(TELEMETRY_EVENT_REPORT_INTERVAL)); | ||
event_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||
|
||
// fetch telemetry tracking_id from the meta node only at the beginning | ||
// There is only one case tracking_id updated at the runtime ---- metastore data has been | ||
// cleaned. There is no way that metastore has been cleaned but nodes are still running | ||
|
@@ -91,9 +100,29 @@ where | |
) | ||
}); | ||
|
||
let (tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::<PbEventMessage>(); | ||
TELEMETRY_EVENT_REPORT_TX.set(tx).unwrap_or_else(|_| { | ||
tracing::warn!( | ||
"Telemetry failed to set event reporting tx, event reporting will be disabled" | ||
); | ||
}); | ||
let mut event_stash = Vec::new(); | ||
|
||
loop { | ||
tokio::select! { | ||
_ = interval.tick() => {}, | ||
event = event_rx.recv() => { | ||
debug_assert!(event.is_some()); | ||
event_stash.push(event.unwrap()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
when using standalone mode, each of the components panics at the very beginning, is it a known issue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess no, will validate tmr. |
||
if event_stash.len() >= TELEMETRY_EVENT_REPORT_STASH_SIZE { | ||
do_telemetry_event_report(&mut event_stash).await; | ||
} | ||
continue; | ||
} | ||
_ = event_interval.tick() => { | ||
do_telemetry_event_report(&mut event_stash).await; | ||
continue; | ||
}, | ||
_ = &mut shutdown_rx => { | ||
tracing::info!("Telemetry exit"); | ||
return; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,9 +21,11 @@ use std::sync::OnceLock; | |
|
||
use prost::Message; | ||
use risingwave_pb::telemetry::{ | ||
EventMessage as PbEventMessage, PbTelemetryDatabaseObject, | ||
EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject, | ||
TelemetryEventStage as PbTelemetryEventStage, | ||
}; | ||
use thiserror_ext::AsReport; | ||
use tokio::sync::mpsc::UnboundedSender; | ||
pub use util::*; | ||
|
||
pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>; | ||
|
@@ -32,6 +34,7 @@ pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>; | |
pub type TelemetryError = String; | ||
|
||
pub static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new(); | ||
pub static TELEMETRY_EVENT_REPORT_TX: OnceLock<UnboundedSender<PbEventMessage>> = OnceLock::new(); | ||
|
||
pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report"; | ||
|
||
|
@@ -42,6 +45,21 @@ pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> { | |
env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok() | ||
} | ||
|
||
pub async fn do_telemetry_event_report(event_stash: &mut Vec<PbEventMessage>) { | ||
const TELEMETRY_EVENT_REPORT_TYPE: &str = "events"; // the batch report url | ||
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); | ||
let batch_message = PbBatchEventMessage { | ||
events: std::mem::take(event_stash), | ||
}; | ||
|
||
post_telemetry_report_pb(&url, batch_message.encode_to_vec()) | ||
.await | ||
.unwrap_or_else(|e| tracing::debug!("{}", e)); | ||
} | ||
|
||
pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; // 10 seconds | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the size of the events accumulated in 10s controllable? If not, I am concerned that doing only time-based batching can cause memory pressure / OOM on the node. We might need to consider adding size-based batching as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes sense. Let me fix. |
||
pub const TELEMETRY_EVENT_REPORT_STASH_SIZE: usize = 100; // 100 events to trigger a report action | ||
|
||
pub fn report_event_common( | ||
event_stage: PbTelemetryEventStage, | ||
event_name: &str, | ||
|
@@ -95,15 +113,12 @@ pub fn request_to_telemetry_event( | |
node, | ||
is_test, | ||
}; | ||
let report_bytes = event.encode_to_vec(); | ||
|
||
tokio::spawn(async move { | ||
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event"; | ||
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); | ||
post_telemetry_report_pb(&url, report_bytes) | ||
.await | ||
.unwrap_or_else(|e| tracing::info!("{}", e)) | ||
}); | ||
|
||
if let Some(tx) = TELEMETRY_EVENT_REPORT_TX.get() { | ||
let _ = tx.send(event).inspect_err(|e| { | ||
tracing::warn!("Failed to send telemetry event queue: {}", e.as_report()) | ||
}); | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
telemetry service still receive requests from legacy clusters and it is still possible getting reports from a ETCD backend cluster.
I believe the field is changed by mistake in #18621
There is no breaking change in the proto file because there is no updates on the server side since then.