-
Notifications
You must be signed in to change notification settings - Fork 595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: batching telemetry event request avoid too many requests #20000
Changes from 3 commits
14ab498
9731e9c
5c6578f
015af02
15de8b9
3ed7e87
9ffe508
a50d227
e793e2c
b0274f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,14 @@ | |
mod util; | ||
|
||
use std::env; | ||
use std::sync::OnceLock; | ||
use std::sync::{LazyLock, OnceLock}; | ||
|
||
use prost::Message; | ||
use risingwave_pb::telemetry::{ | ||
EventMessage as PbEventMessage, PbTelemetryDatabaseObject, | ||
EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject, | ||
TelemetryEventStage as PbTelemetryEventStage, | ||
}; | ||
use tokio::sync::Mutex; | ||
pub use util::*; | ||
|
||
pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>; | ||
|
@@ -42,6 +43,27 @@ pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> { | |
env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok() | ||
} | ||
|
||
static TELEMETRY_EVENT_REPORT_STASH: LazyLock<Mutex<Vec<PbEventMessage>>> = | ||
tabVersion marked this conversation as resolved.
Show resolved
Hide resolved
|
||
LazyLock::new(|| Mutex::new(Vec::new())); | ||
|
||
pub async fn do_telemetry_event_report() { | ||
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event"; | ||
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); | ||
let mut batch_message = PbBatchEventMessage { events: Vec::new() }; | ||
|
||
let mut stash_guard = TELEMETRY_EVENT_REPORT_STASH.lock().await; | ||
for event in stash_guard.drain(..) { | ||
batch_message.events.push(event); | ||
} | ||
drop(stash_guard); | ||
|
||
post_telemetry_report_pb(&url, batch_message.encode_to_vec()) | ||
.await | ||
.unwrap_or_else(|e| tracing::debug!("{}", e)); | ||
} | ||
|
||
pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; // 10 seconds | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the size of the events accumulated in 10s controllable? If not, I am concerned that doing only time-based batching can cause memory pressure / OOM on the node. We might need to consider adding size-based batching as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes sense. Let me fix. |
||
|
||
pub fn report_event_common( | ||
event_stage: PbTelemetryEventStage, | ||
event_name: &str, | ||
|
@@ -95,15 +117,8 @@ pub fn request_to_telemetry_event( | |
node, | ||
is_test, | ||
}; | ||
let report_bytes = event.encode_to_vec(); | ||
|
||
tokio::spawn(async move { | ||
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event"; | ||
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned(); | ||
post_telemetry_report_pb(&url, report_bytes) | ||
.await | ||
.unwrap_or_else(|e| tracing::info!("{}", e)) | ||
}); | ||
|
||
TELEMETRY_EVENT_REPORT_STASH.blocking_lock().push(event); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. after switching to unbounded channel, there seems no need for the sync Mutex. |
||
} | ||
|
||
#[cfg(test)] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
telemetry service still receive requests from legacy clusters and it is still possible getting reports from a ETCD backend cluster.
I believe the field is changed by mistake in #18621
There is no breaking change in the proto file because there is no updates on the server side since then.