Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): support report event to telemetry #17486

Merged
merged 22 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,46 @@ message FrontendReport {
message CompactorReport {
ReportBase base = 1;
}

enum TelemetryEventStage {
Telemetry_Event_Stage_UNSPECIFIED = 0;
Telemetry_Event_Stage_CreateStreamJob = 1;
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
Telemetry_Event_Stage_UpdateStreamJob = 2;
Telemetry_Event_Stage_DropStreamJob = 3;
Telemetry_Event_Stage_Query = 4;
Telemetry_Event_Stage_Recovery = 5;
}

enum TelemetryDatabaseComponents {
Telemetry_Database_Components_UNSPECIFIED = 0;
Telemetry_Database_Components_Source = 1;
Telemetry_Database_Components_Mv = 2;
Telemetry_Database_Components_Table = 3;
Telemetry_Database_Components_Sink = 4;
Telemetry_Database_Components_Index = 5;
}

message EventMessage {
// tracking_id is persistent in meta data
string tracking_id = 1;
// session_id is reset every time node restarts
string session_id = 2;
// event_time is when the event is created
uint64 event_time_sec = 3;
// event_stage describes in which process the event happens
TelemetryEventStage event_stage = 4;
// feature_name is the name of the feature triggered the event
string feature_name = 5;
// connector_name is the name of the connector involves
optional string connector_name = 6;
// connector_direction is the direction of data flow, can be source or sink
optional TelemetryDatabaseComponents component = 10;
// catalog_id is the id of the catalog involves (table_id/source_id/...)
int64 catalog_id = 7;
// attributes is the additional information of the event: json format ser to string
optional string attributes = 8;
// node is the node that creates the event
string node = 9;
// mark the event is a test message
bool is_test = 11;
}
18 changes: 16 additions & 2 deletions src/common/src/telemetry/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,35 @@ where
{
info_fetcher: Arc<I>,
report_creator: Arc<F>,

// This is a function that sets the tracking_id and session_id in different nodes
// Tracking_id and Session_id are also used to collect events
set_func_tracking_id_and_session_id: Arc<dyn Fn(String, String) + Send + Sync>,
}

impl<F, I> TelemetryManager<F, I>
where
F: TelemetryReportCreator + Send + Sync + 'static,
I: TelemetryInfoFetcher + Send + Sync + 'static,
{
pub fn new(info_fetcher: Arc<I>, report_creator: Arc<F>) -> Self {
pub fn new(
info_fetcher: Arc<I>,
report_creator: Arc<F>,
set_func_tracking_id_and_session_id: Arc<dyn Fn(String, String) + Send + Sync>,
) -> Self {
Self {
info_fetcher,
report_creator,
set_func_tracking_id_and_session_id,
}
}

pub async fn start(&self) -> (JoinHandle<()>, Sender<()>) {
start_telemetry_reporting(self.info_fetcher.clone(), self.report_creator.clone()).await
start_telemetry_reporting(
self.info_fetcher.clone(),
self.report_creator.clone(),
self.set_func_tracking_id_and_session_id.clone(),
)
.await
}
}
59 changes: 58 additions & 1 deletion src/common/src/telemetry/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@

use std::sync::Arc;

use prost::Message;
use risingwave_pb::telemetry::{
EventMessage as PbEventMessage, PbTelemetryDatabaseComponents,
TelemetryEventStage as PbTelemetryEventStage,
};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio::time::{interval, Duration};
use uuid::Uuid;

use super::{Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL};
use super::{current_timestamp, Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL};
use crate::telemetry::pb_compatible::TelemetryToProtobuf;
use crate::telemetry::post_telemetry_report_pb;

Expand All @@ -45,6 +50,7 @@ pub trait TelemetryReportCreator {
pub async fn start_telemetry_reporting<F, I>(
info_fetcher: Arc<I>,
report_creator: Arc<F>,
set_tracking_id_and_session_id: Arc<dyn Fn(String, String) + Send + Sync>,
) -> (JoinHandle<()>, Sender<()>)
where
F: TelemetryReportCreator + Send + Sync + 'static,
Expand Down Expand Up @@ -74,6 +80,7 @@ where
return;
}
};
set_tracking_id_and_session_id(tracking_id.clone(), session_id.clone());

loop {
tokio::select! {
Expand Down Expand Up @@ -112,3 +119,53 @@ where
});
(join_handle, shutdown_tx)
}

pub fn report_event_common(
get_tracking_id_and_session_id: Box<dyn Fn() -> (Option<String>, Option<String>) + Send + Sync>,
event_stage: PbTelemetryEventStage,
feature_name: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we name it "event_name"? The examples in this PR are normal_recovery, adhoc_recovery, failure_recovery etc. which doesn't seem to be "feature" to me.

Besides, &str seems better, because it implies the event name must be a static string, and dynamic values should be put in attributes.

Suggested change
feature_name: String,
event_name: &str,

catalog_id: i64,
connector_name: Option<String>,
components: Option<PbTelemetryDatabaseComponents>,
attributes: Option<String>, // any json string
node: String,
) {
// if disabled the telemetry, tracking_id and session_id will be None, so the event will not be reported
let event_tracking_id: String;
let event_session_id: String;
let (tracking_id, session_id) = get_tracking_id_and_session_id();
if let Some(tracing_id) = tracking_id
&& let Some(session_id) = session_id
{
event_session_id = session_id;
event_tracking_id = tracing_id;
} else {
tracing::info!(
"got empty tracking_id or session_id, Telemetry is disabled or not initialized"
);
return;
}
let event = PbEventMessage {
tracking_id: event_tracking_id,
session_id: event_session_id,
event_time_sec: current_timestamp(),
event_stage: event_stage as i32,
feature_name,
connector_name,
component: components.map(|c| c as i32),
catalog_id,
attributes,
node,
is_test: false,
};
let report_bytes = event.encode_to_vec();

tokio::spawn(async move {
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event";
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();

post_telemetry_report_pb(&url, report_bytes)
.await
.unwrap_or_else(|e| tracing::warn!("{}", e))
});
}
1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ tokio-stream = "0.1"
tonic = { workspace = true }
tower = { version = "0.4", features = ["util", "load-shed"] }
tracing = "0.1"
uuid = { version = "1.8.0", features = ["v4"] }

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }
Expand Down
3 changes: 2 additions & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ use crate::rpc::service::exchange_service::ExchangeServiceImpl;
use crate::rpc::service::health_service::HealthServiceImpl;
use crate::rpc::service::monitor_service::{AwaitTreeMiddlewareLayer, MonitorServiceImpl};
use crate::rpc::service::stream_service::StreamServiceImpl;
use crate::telemetry::ComputeTelemetryCreator;
use crate::telemetry::{set_compute_telemetry_tracking_id_and_session_id, ComputeTelemetryCreator};
use crate::ComputeNodeOpts;

/// Bootstraps the compute-node.
Expand Down Expand Up @@ -385,6 +385,7 @@ pub async fn compute_node_serve(
let telemetry_manager = TelemetryManager::new(
Arc::new(meta_client.clone()),
Arc::new(ComputeTelemetryCreator::new()),
Arc::new(set_compute_telemetry_tracking_id_and_session_id),
);

// if the toml config file or env variable disables telemetry, do not watch system params change
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_common::telemetry::report::TelemetryReportCreator;
use risingwave_common::telemetry::{
current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult,
};
pub use risingwave_stream::telemetry::set_compute_telemetry_tracking_id_and_session_id;
use serde::{Deserialize, Serialize};

const TELEMETRY_COMPUTE_REPORT_TYPE: &str = "compute";
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_pb::catalog::{
PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
};
use risingwave_pb::secret::PbSecretRef;
use serde_derive::Serialize;

use super::{
SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
Expand Down Expand Up @@ -125,15 +126,15 @@ pub struct SinkFormatDesc {
}

/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub enum SinkFormat {
AppendOnly,
Upsert,
Debezium,
}

/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub enum SinkEncode {
Json,
Protobuf,
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ use crate::scheduler::{
DistributedQueryMetrics, HummockSnapshotManager, HummockSnapshotManagerRef, QueryManager,
GLOBAL_DISTRIBUTED_QUERY_METRICS,
};
use crate::telemetry::FrontendTelemetryCreator;
use crate::telemetry::{
set_frontend_telemetry_tracking_id_and_session_id, FrontendTelemetryCreator,
};
use crate::user::user_authentication::md5_hash_with_salt;
use crate::user::user_manager::UserInfoManager;
use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
Expand Down Expand Up @@ -355,6 +357,7 @@ impl FrontendEnv {
let telemetry_manager = TelemetryManager::new(
Arc::new(meta_client.clone()),
Arc::new(FrontendTelemetryCreator::new()),
Arc::new(set_frontend_telemetry_tracking_id_and_session_id),
);

// if the toml config file or env variable disables telemetry, do not watch system params
Expand Down
40 changes: 39 additions & 1 deletion src/frontend/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,54 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::OnceLock;

use prost::Message;
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::report::TelemetryReportCreator;
use risingwave_common::telemetry::report::{report_event_common, TelemetryReportCreator};
use risingwave_common::telemetry::{
current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult,
};
use risingwave_pb::telemetry::{PbTelemetryDatabaseComponents, PbTelemetryEventStage};
use serde::{Deserialize, Serialize};

const TELEMETRY_FRONTEND_REPORT_TYPE: &str = "frontend";

static FRONTEND_TELEMETRY_SESSION_ID: OnceLock<String> = OnceLock::new();
static FRONTEND_TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();

pub fn set_frontend_telemetry_tracking_id_and_session_id(tracking_id: String, session_id: String) {
FRONTEND_TELEMETRY_TRACKING_ID.set(tracking_id).unwrap();
FRONTEND_TELEMETRY_SESSION_ID.set(session_id).unwrap();
}

fn _get_frontend_telemetry_tracking_id_and_session_id() -> (Option<String>, Option<String>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn _get_frontend_telemetry_tracking_id_and_session_id() -> (Option<String>, Option<String>) {
fn get_frontend_telemetry_tracking_id_and_session_id() -> (Option<String>, Option<String>) {

Are you trying to say #[allow(unused)] or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I am not sure we want such attr. #[allow(dead_code)] cause some trouble in source, and @xxchan complained about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I complained using #![allow(dead_code)] at the top-level.

  • Using it on individual functions is ok.
  • Whether top-level or function-level, #[expect(dead_code, reason="xxx")] is better. Because it tells the reason, and tells you to remove it when the lint is no longer fullfilled.

(
FRONTEND_TELEMETRY_TRACKING_ID.get().cloned(),
FRONTEND_TELEMETRY_SESSION_ID.get().cloned(),
)
}

pub(crate) fn _report_event(
event_stage: PbTelemetryEventStage,
feature_name: String,
catalog_id: i64,
connector_name: Option<String>,
component: Option<PbTelemetryDatabaseComponents>,
attributes: Option<String>, // any json string
) {
report_event_common(
Box::new(_get_frontend_telemetry_tracking_id_and_session_id),
event_stage,
feature_name,
catalog_id,
connector_name,
component,
attributes,
TELEMETRY_FRONTEND_REPORT_TYPE.to_string(),
);
}

#[derive(Clone, Copy)]
pub(crate) struct FrontendTelemetryCreator {}

Expand Down
5 changes: 4 additions & 1 deletion src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ use crate::rpc::metrics::{
use crate::serving::ServingVnodeMapping;
use crate::storage::{EtcdMetaStore, MemStore, MetaStoreBoxExt, WrappedEtcdClient as EtcdClient};
use crate::stream::{GlobalStreamManager, SourceManager};
use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
use crate::telemetry::{
set_meta_telemetry_tracking_id_and_session_id, MetaReportCreator, MetaTelemetryInfoFetcher,
};
use crate::{hummock, serving, MetaError, MetaResult};

/// Used for standalone mode checking the status of the meta service.
Expand Down Expand Up @@ -743,6 +745,7 @@ pub async fn start_service_as_election_leader(
metadata_manager.clone(),
env.meta_store().backend(),
)),
Arc::new(set_meta_telemetry_tracking_id_and_session_id),
);

// May start telemetry reporting
Expand Down
26 changes: 26 additions & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,14 @@ impl GlobalBarrierManager {
self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap));
let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0);
crate::telemetry::report_event(
risingwave_pb::telemetry::TelemetryEventStage::Recovery,
"normal_recovery".to_string(),
0,
None,
None,
None,
);

let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
let paused_reason = paused.then_some(PausedReason::Manual);
Expand Down Expand Up @@ -805,6 +813,15 @@ impl GlobalBarrierManager {
prev_epoch = prev_epoch.value().0
);

crate::telemetry::report_event(
risingwave_pb::telemetry::TelemetryEventStage::Recovery,
"failure_recovery".to_string(),
0,
None,
None,
None,
);

// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
self.recovery(None).instrument(span).await;
Expand All @@ -830,6 +847,15 @@ impl GlobalBarrierManager {
prev_epoch = prev_epoch.value().0
);

crate::telemetry::report_event(
risingwave_pb::telemetry::TelemetryEventStage::Recovery,
"adhoc_recovery".to_string(),
0,
None,
None,
None,
);

// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
self.recovery(None).instrument(span).await;
Expand Down
Loading
Loading