-
Notifications
You must be signed in to change notification settings - Fork 595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: batching telemetry event request avoid too many requests #20000
Changes from 5 commits
14ab498
9731e9c
5c6578f
015af02
15de8b9
3ed7e87
9ffe508
a50d227
e793e2c
b0274f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,13 +14,19 @@ | |
|
||
use std::sync::Arc; | ||
|
||
use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid; | ||
use risingwave_pb::telemetry::PbEventMessage; | ||
pub use risingwave_telemetry_event::{ | ||
current_timestamp, post_telemetry_report_pb, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID, | ||
current_timestamp, do_telemetry_event_report, post_telemetry_report_pb, | ||
TELEMETRY_EVENT_REPORT_INTERVAL, TELEMETRY_EVENT_REPORT_STASH, TELEMETRY_REPORT_URL, | ||
TELEMETRY_TRACKING_ID, | ||
}; | ||
use risingwave_telemetry_event::{ | ||
get_telemetry_risingwave_cloud_uuid, TELEMETRY_EVENT_REPORT_STASH_SIZE, | ||
TELEMETRY_EVENT_REPORT_TX, | ||
}; | ||
use tokio::sync::oneshot::Sender; | ||
use tokio::task::JoinHandle; | ||
use tokio::time::{interval, Duration}; | ||
use tokio::time::{interval as tokio_interval_fn, Duration}; | ||
use uuid::Uuid; | ||
|
||
use super::{Result, TELEMETRY_REPORT_INTERVAL}; | ||
|
@@ -60,9 +66,13 @@ where | |
|
||
let begin_time = std::time::Instant::now(); | ||
let session_id = Uuid::new_v4().to_string(); | ||
let mut interval = interval(Duration::from_secs(TELEMETRY_REPORT_INTERVAL)); | ||
let mut interval = tokio_interval_fn(Duration::from_secs(TELEMETRY_REPORT_INTERVAL)); | ||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||
|
||
let mut event_interval = | ||
tokio_interval_fn(Duration::from_secs(TELEMETRY_EVENT_REPORT_INTERVAL)); | ||
event_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||
|
||
// fetch telemetry tracking_id from the meta node only at the beginning | ||
// There is only one case tracking_id updated at the runtime ---- metastore data has been | ||
// cleaned. There is no way that metastore has been cleaned but nodes are still running | ||
|
@@ -91,9 +101,29 @@ where | |
) | ||
}); | ||
|
||
let (tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::<PbEventMessage>(); | ||
TELEMETRY_EVENT_REPORT_TX.set(tx).unwrap_or_else(|_| { | ||
tracing::warn!( | ||
"Telemetry failed to set event reporting tx, event reporting will be disabled" | ||
); | ||
}); | ||
|
||
loop { | ||
tokio::select! { | ||
_ = interval.tick() => {}, | ||
event = event_rx.recv() => { | ||
if let Some(event) = event { | ||
tabVersion marked this conversation as resolved.
Show resolved
Hide resolved
|
||
TELEMETRY_EVENT_REPORT_STASH.lock().await.push(event); | ||
} | ||
if TELEMETRY_EVENT_REPORT_STASH.lock().await.len() >= TELEMETRY_EVENT_REPORT_STASH_SIZE { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
let mut batch = Vec::new();
loop {
....
tokio::select! {
...
... => {
...
do_telemetry_event_report(std::mem::take(&mut batch)).await;
}
...
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My intention is to move all event report stuff inside the |
||
do_telemetry_event_report().await; | ||
} | ||
continue; | ||
} | ||
_ = event_interval.tick() => { | ||
do_telemetry_event_report().await; | ||
continue; | ||
}, | ||
_ = &mut shutdown_rx => { | ||
tracing::info!("Telemetry exit"); | ||
return; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,15 @@ | |
mod util; | ||
|
||
use std::env; | ||
use std::sync::OnceLock; | ||
use std::sync::{LazyLock, OnceLock}; | ||
|
||
use prost::Message; | ||
use risingwave_pb::telemetry::{ | ||
EventMessage as PbEventMessage, PbTelemetryDatabaseObject, | ||
EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject, | ||
TelemetryEventStage as PbTelemetryEventStage, | ||
}; | ||
use tokio::sync::mpsc::UnboundedSender; | ||
use tokio::sync::Mutex; | ||
pub use util::*; | ||
|
||
pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>; | ||
|
@@ -32,6 +34,7 @@ pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>; | |
pub type TelemetryError = String; | ||
|
||
pub static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new(); | ||
pub static TELEMETRY_EVENT_REPORT_TX: OnceLock<UnboundedSender<PbEventMessage>> = OnceLock::new(); | ||
|
||
pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report"; | ||
|
||
|
@@ -42,6 +45,28 @@ pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> { | |
env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok() | ||
} | ||
|
||
pub static TELEMETRY_EVENT_REPORT_STASH: LazyLock<Mutex<Vec<PbEventMessage>>> = | ||
LazyLock::new(|| Mutex::new(Vec::new())); | ||
|
||
pub async fn do_telemetry_event_report() { | ||
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event"; | ||
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); | ||
let mut batch_message = PbBatchEventMessage { events: Vec::new() }; | ||
|
||
let mut stash_guard = TELEMETRY_EVENT_REPORT_STASH.lock().await; | ||
for event in stash_guard.drain(..) { | ||
batch_message.events.push(event); | ||
} | ||
drop(stash_guard); | ||
|
||
post_telemetry_report_pb(&url, batch_message.encode_to_vec()) | ||
.await | ||
.unwrap_or_else(|e| tracing::debug!("{}", e)); | ||
} | ||
|
||
pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; // 10 seconds | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the size of the events accumulated in 10s controllable? If not, I am concerned that doing only time-based batching can cause memory pressure / OOM on the node. We might need to consider adding size-based batching as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes sense. Let me fix. |
||
pub const TELEMETRY_EVENT_REPORT_STASH_SIZE: usize = 100; // 100 events to trigger a report action | ||
|
||
pub fn report_event_common( | ||
event_stage: PbTelemetryEventStage, | ||
event_name: &str, | ||
|
@@ -95,15 +120,10 @@ pub fn request_to_telemetry_event( | |
node, | ||
is_test, | ||
}; | ||
let report_bytes = event.encode_to_vec(); | ||
|
||
tokio::spawn(async move { | ||
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event"; | ||
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); | ||
post_telemetry_report_pb(&url, report_bytes) | ||
.await | ||
.unwrap_or_else(|e| tracing::info!("{}", e)) | ||
}); | ||
|
||
if let Some(tx) = TELEMETRY_EVENT_REPORT_TX.get() { | ||
tx.send(event).unwrap(); | ||
tabVersion marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
#[cfg(test)] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
telemetry service still receive requests from legacy clusters and it is still possible getting reports from a ETCD backend cluster.
I believe the field is changed by mistake in #18621
There is no breaking change in the proto file because there is no updates on the server side since then.