Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batching telemetry event request avoid too many requests #20000

Merged
merged 10 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

option go_package = "risingwavelabs.com/risingwave/proto/telemetry";

enum MetaBackend {

Check failure on line 7 in proto/telemetry.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved name "META_BACKEND_ETCD" on enum "MetaBackend" was deleted.

Check failure on line 7 in proto/telemetry.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present reserved range "[2]" on enum "MetaBackend" is missing values: [2] were removed.
META_BACKEND_UNSPECIFIED = 0;
META_BACKEND_MEMORY = 1;
reserved 2;
reserved "META_BACKEND_ETCD";
Comment on lines -10 to -11
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

telemetry service still receive requests from legacy clusters and it is still possible getting reports from a ETCD backend cluster.
I believe the field is changed by mistake in #18621
There is no breaking change in the proto file because there is no updates on the server side since then.

META_BACKEND_ETCD = 2;
META_BACKEND_RDB = 3;
}

Expand Down Expand Up @@ -167,3 +166,7 @@
// mark the event is a test message
bool is_test = 11;
}

message BatchEventMessage {
repeated EventMessage events = 1;
}
15 changes: 12 additions & 3 deletions src/common/src/telemetry/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use std::sync::Arc;

use risingwave_telemetry_event::get_telemetry_risingwave_cloud_uuid;
pub use risingwave_telemetry_event::{
current_timestamp, post_telemetry_report_pb, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID,
current_timestamp, do_telemetry_event_report, post_telemetry_report_pb,
TELEMETRY_EVENT_REPORT_INTERVAL, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID,
};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio::time::{interval, Duration};
use tokio::time::{interval as tokio_interval_fn, Duration};
use uuid::Uuid;

use super::{Result, TELEMETRY_REPORT_INTERVAL};
Expand Down Expand Up @@ -60,9 +61,13 @@ where

let begin_time = std::time::Instant::now();
let session_id = Uuid::new_v4().to_string();
let mut interval = interval(Duration::from_secs(TELEMETRY_REPORT_INTERVAL));
let mut interval = tokio_interval_fn(Duration::from_secs(TELEMETRY_REPORT_INTERVAL));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut event_interval =
tokio_interval_fn(Duration::from_secs(TELEMETRY_EVENT_REPORT_INTERVAL));
event_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

// fetch telemetry tracking_id from the meta node only at the beginning
// There is only one case tracking_id updated at the runtime ---- metastore data has been
// cleaned. There is no way that metastore has been cleaned but nodes are still running
Expand Down Expand Up @@ -94,6 +99,10 @@ where
loop {
tokio::select! {
_ = interval.tick() => {},
_ = event_interval.tick() => {
do_telemetry_event_report().await;
continue;
},
_ = &mut shutdown_rx => {
tracing::info!("Telemetry exit");
return;
Expand Down
37 changes: 26 additions & 11 deletions src/common/telemetry_event/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
mod util;

use std::env;
use std::sync::OnceLock;
use std::sync::{LazyLock, OnceLock};

use prost::Message;
use risingwave_pb::telemetry::{
EventMessage as PbEventMessage, PbTelemetryDatabaseObject,
EventMessage as PbEventMessage, PbBatchEventMessage, PbTelemetryDatabaseObject,
TelemetryEventStage as PbTelemetryEventStage,
};
use tokio::sync::Mutex;
pub use util::*;

pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
Expand All @@ -42,6 +43,27 @@ pub fn get_telemetry_risingwave_cloud_uuid() -> Option<String> {
env::var(TELEMETRY_RISINGWAVE_CLOUD_UUID).ok()
}

static TELEMETRY_EVENT_REPORT_STASH: LazyLock<Mutex<Vec<PbEventMessage>>> =
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
LazyLock::new(|| Mutex::new(Vec::new()));

pub async fn do_telemetry_event_report() {
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event";
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
let mut batch_message = PbBatchEventMessage { events: Vec::new() };

let mut stash_guard = TELEMETRY_EVENT_REPORT_STASH.lock().await;
for event in stash_guard.drain(..) {
batch_message.events.push(event);
}
drop(stash_guard);

post_telemetry_report_pb(&url, batch_message.encode_to_vec())
.await
.unwrap_or_else(|e| tracing::debug!("{}", e));
}

pub const TELEMETRY_EVENT_REPORT_INTERVAL: u64 = 10; // 10 seconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the size of the events accumulated in 10s controllable? If not, I am concerned that doing only time-based batching can cause memory pressure / OOM on the node. We might need to consider adding size-based batching as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense. Let me fix.


pub fn report_event_common(
event_stage: PbTelemetryEventStage,
event_name: &str,
Expand Down Expand Up @@ -95,15 +117,8 @@ pub fn request_to_telemetry_event(
node,
is_test,
};
let report_bytes = event.encode_to_vec();

tokio::spawn(async move {
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event";
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
post_telemetry_report_pb(&url, report_bytes)
.await
.unwrap_or_else(|e| tracing::info!("{}", e))
});

TELEMETRY_EVENT_REPORT_STASH.blocking_lock().push(event);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we call blocking_lock here, why not just using a non-async mutex (parking_lot::Mutex) instead of the tokio async mutex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after switching to unbounded channel, there seems no need for the sync Mutex.

}

#[cfg(test)]
Expand Down
Loading