Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batching telemetry event request avoid too many requests #20000

Merged
merged 10 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

option go_package = "risingwavelabs.com/risingwave/proto/telemetry";

enum MetaBackend {

Check failure on line 7 in proto/telemetry.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved name "META_BACKEND_ETCD" on enum "MetaBackend" was deleted.

Check failure on line 7 in proto/telemetry.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved range "[2]" on enum "MetaBackend" is missing values: [2] were removed.
META_BACKEND_UNSPECIFIED = 0;
META_BACKEND_MEMORY = 1;
reserved 2;
reserved "META_BACKEND_ETCD";
Comment on lines -10 to -11
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

telemetry service still receive requests from legacy clusters and it is still possible getting reports from a ETCD backend cluster.
I believe the field is changed by mistake in #18621
There is no breaking change in the proto file because there is no updates on the server side since then.

META_BACKEND_ETCD = 2;
META_BACKEND_RDB = 3;
}

Expand Down Expand Up @@ -167,3 +166,7 @@
// mark the event is a test message
bool is_test = 11;
}

message BatchEventMessage {
repeated EventMessage events = 1;
}
38 changes: 34 additions & 4 deletions src/common/src/telemetry/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@

use std::sync::Arc;

use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid;
use risingwave_pb::telemetry::PbEventMessage;
pub use risingwave_telemetry_event::{
current_timestamp, post_telemetry_report_pb, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID,
current_timestamp, do_telemetry_event_report, post_telemetry_report_pb,
TELEMETRY_EVENT_REPORT_INTERVAL, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID,
};
use risingwave_telemetry_event::{
get_telemetry_risingwave_cloud_uuid, TELEMETRY_EVENT_REPORT_STASH_SIZE,
TELEMETRY_EVENT_REPORT_TX,
};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio::time::{interval, Duration};
use tokio::time::{interval as tokio_interval_fn, Duration};
use uuid::Uuid;

use super::{Result, TELEMETRY_REPORT_INTERVAL};
Expand Down Expand Up @@ -60,9 +65,13 @@ where

let begin_time = std::time::Instant::now();
let session_id = Uuid::new_v4().to_string();
let mut interval = interval(Duration::from_secs(TELEMETRY_REPORT_INTERVAL));
let mut interval = tokio_interval_fn(Duration::from_secs(TELEMETRY_REPORT_INTERVAL));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut event_interval =
tokio_interval_fn(Duration::from_secs(TELEMETRY_EVENT_REPORT_INTERVAL));
event_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

// fetch telemetry tracking_id from the meta node only at the beginning
// There is only one case tracking_id updated at the runtime ---- metastore data has been
// cleaned. There is no way that metastore has been cleaned but nodes are still running
Expand Down Expand Up @@ -91,9 +100,30 @@ where
)
});

let (tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::<PbEventMessage>();
TELEMETRY_EVENT_REPORT_TX.set(tx).unwrap_or_else(|_| {
tracing::warn!(
"Telemetry failed to set event reporting tx, event reporting will be disabled"
);
});
let mut event_stash = Vec::new();

loop {
tokio::select! {
_ = interval.tick() => {},
event = event_rx.recv() => {
if let Some(event) = event {
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
event_stash.push(event);
}
if event_stash.len() >= TELEMETRY_EVENT_REPORT_STASH_SIZE {
do_telemetry_event_report(&mut event_stash).await;
}
continue;
}
_ = event_interval.tick() => {
do_telemetry_event_report(&mut event_stash).await;
continue;
},
_ = &mut shutdown_rx => {
tracing::info!("Telemetry exit");
return;
Expand Down
34 changes: 24 additions & 10 deletions src/common/telemetry_event/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ use std::sync::OnceLock;

use prost::Message;
use risingwave_pb::telemetry::{
EventMessage as PbEventMessage, PbTelemetryDatabaseObject,
EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject,
TelemetryEventStage as PbTelemetryEventStage,
};
use tokio::sync::mpsc::UnboundedSender;
pub use util::*;

pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
Expand All @@ -32,6 +33,7 @@ pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
pub type TelemetryError = String;

pub static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();
pub static TELEMETRY_EVENT_REPORT_TX: OnceLock<UnboundedSender<PbEventMessage>> = OnceLock::new();

pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report";

Expand All @@ -42,6 +44,23 @@ pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> {
env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok()
}

pub async fn do_telemetry_event_report(event_stash: &mut Vec<PbEventMessage>) {
const TELEMETRY_EVENT_REPORT_TYPE: &str = "events"; // the batch report url
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
let mut batch_message = PbBatchEventMessage { events: Vec::new() };

for event in event_stash.drain(..) {
batch_message.events.push(event);
}
tabVersion marked this conversation as resolved.
Show resolved Hide resolved

post_telemetry_report_pb(&url, batch_message.encode_to_vec())
.await
.unwrap_or_else(|e| tracing::debug!("{}", e));
}

pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; // 10 seconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the size of the events accumulated in 10s controllable? If not, I am concerned that doing only time-based batching can cause memory pressure / OOM on the node. We might need to consider adding size-based batching as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense. Let me fix.

pub const TELEMETRY_EVENT_REPORT_STASH_SIZE: usize = 100; // 100 events to trigger a report action

pub fn report_event_common(
event_stage: PbTelemetryEventStage,
event_name: &str,
Expand Down Expand Up @@ -95,15 +114,10 @@ pub fn request_to_telemetry_event(
node,
is_test,
};
let report_bytes = event.encode_to_vec();

tokio::spawn(async move {
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event";
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
post_telemetry_report_pb(&url, report_bytes)
.await
.unwrap_or_else(|e| tracing::info!("{}", e))
});

if let Some(tx) = TELEMETRY_EVENT_REPORT_TX.get() {
tx.send(event).unwrap();
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
}
}

#[cfg(test)]
Expand Down
Loading