Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): support report event to telemetry (#17486) #17835

Merged
merged 2 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions proto/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,44 @@ message FrontendReport {
message CompactorReport {
ReportBase base = 1;
}

enum TelemetryEventStage {
TELEMETRY_EVENT_STAGE_UNSPECIFIED = 0;
TELEMETRY_EVENT_STAGE_CREATE_STREAM_JOB = 1;
TELEMETRY_EVENT_STAGE_UPDATE_STREAM_JOB = 2;
TELEMETRY_EVENT_STAGE_DROP_STREAM_JOB = 3;
TELEMETRY_EVENT_STAGE_QUERY = 4;
TELEMETRY_EVENT_STAGE_RECOVERY = 5;
}

enum TelemetryDatabaseObject {
TELEMETRY_DATABASE_OBJECT_UNSPECIFIED = 0;
TELEMETRY_DATABASE_OBJECT_SOURCE = 1;
TELEMETRY_DATABASE_OBJECT_MV = 2;
TELEMETRY_DATABASE_OBJECT_TABLE = 3;
TELEMETRY_DATABASE_OBJECT_SINK = 4;
TELEMETRY_DATABASE_OBJECT_INDEX = 5;
}

message EventMessage {
// tracking_id is persistent in meta data
string tracking_id = 1;
// event_time is when the event is created
uint64 event_time_sec = 3;
// event_stage describes in which process the event happens
TelemetryEventStage event_stage = 4;
// feature_name is the name of the feature triggered the event
string event_name = 5;
// connector_name is the name of the connector involves
optional string connector_name = 6;
// connector_direction is the direction of data flow, can be source or sink
optional TelemetryDatabaseObject object = 10;
// catalog_id is the id of the catalog involves (table_id/source_id/...)
int64 catalog_id = 7;
// attributes is the additional information of the event: json format ser to string
optional string attributes = 8;
// node is the node that creates the event
string node = 9;
// mark the event is a test message
bool is_test = 11;
}
114 changes: 112 additions & 2 deletions src/common/src/telemetry/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use prost::Message;
use risingwave_pb::telemetry::{
EventMessage as PbEventMessage, PbTelemetryDatabaseObject,
TelemetryEventStage as PbTelemetryEventStage,
};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio::time::{interval, Duration};
use uuid::Uuid;

use super::{Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL};
use super::{current_timestamp, Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL};
use crate::telemetry::pb_compatible::TelemetryToProtobuf;
use crate::telemetry::post_telemetry_report_pb;

Expand All @@ -42,6 +47,8 @@ pub trait TelemetryReportCreator {
fn report_type(&self) -> &str;
}

static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();

pub async fn start_telemetry_reporting<F, I>(
info_fetcher: Arc<I>,
report_creator: Arc<F>,
Expand Down Expand Up @@ -74,6 +81,13 @@ where
return;
}
};
TELEMETRY_TRACKING_ID
.set(tracking_id.clone())
.unwrap_or_else(|_| {
tracing::warn!(
"Telemetry failed to set tracking_id, event reporting will be disabled"
)
});

loop {
tokio::select! {
Expand Down Expand Up @@ -112,3 +126,99 @@ where
});
(join_handle, shutdown_tx)
}

pub fn report_event_common(
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
object: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, // any json string
node: String,
) {
let event_tracking_id: String;
if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() {
event_tracking_id = tracking_id.to_string();
} else {
tracing::info!("Telemetry tracking_id is not set, event reporting disabled");
return;
}

request_to_telemetry_event(
event_tracking_id,
event_stage,
event_name,
catalog_id,
connector_name,
object,
attributes,
node,
false,
);
}

fn request_to_telemetry_event(
tracking_id: String,
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
object: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, // any json string
node: String,
is_test: bool,
) {
let event = PbEventMessage {
tracking_id,
event_time_sec: current_timestamp(),
event_stage: event_stage as i32,
event_name: event_name.to_string(),
connector_name,
object: object.map(|c| c as i32),
catalog_id,
attributes: attributes.map(|a| a.to_string()),
node,
is_test,
};
let report_bytes = event.encode_to_vec();

tokio::spawn(async move {
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event";
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
post_telemetry_report_pb(&url, report_bytes)
.await
.unwrap_or_else(|e| tracing::info!("{}", e))
});
}

#[cfg(test)]
mod test {

use super::*;

#[ignore]
#[tokio::test]
async fn test_telemetry_report_event() {
let event_stage = PbTelemetryEventStage::CreateStreamJob;
let event_name = "test_feature";
let catalog_id = 1;
let connector_name = Some("test_connector".to_string());
let object = Some(PbTelemetryDatabaseObject::Source);
let attributes = None;
let node = "test_node".to_string();

request_to_telemetry_event(
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
event_stage,
event_name,
catalog_id,
connector_name,
object,
attributes,
node,
true,
);

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ tokio-stream = "0.1"
tonic = { workspace = true }
tower = { version = "0.4", features = ["util", "load-shed"] }
tracing = "0.1"
uuid = { version = "1.8.0", features = ["v4"] }

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }
Expand Down
18 changes: 16 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod desc;

use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};

use anyhow::anyhow;
use itertools::Itertools;
Expand All @@ -28,6 +29,7 @@ use risingwave_pb::catalog::{
PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
};
use risingwave_pb::secret::PbSecretRef;
use serde_derive::Serialize;

use super::{
SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
Expand Down Expand Up @@ -125,15 +127,21 @@ pub struct SinkFormatDesc {
}

/// TODO: consolidate with [`crate::source::SourceFormat`] and [`crate::parser::ProtocolProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub enum SinkFormat {
AppendOnly,
Upsert,
Debezium,
}

impl Display for SinkFormat {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

/// TODO: consolidate with [`crate::source::SourceEncode`] and [`crate::parser::EncodingProperties`].
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub enum SinkEncode {
Json,
Protobuf,
Expand All @@ -142,6 +150,12 @@ pub enum SinkEncode {
Text,
}

impl Display for SinkEncode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

impl SinkFormatDesc {
pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
use crate::sink::kafka::KafkaSink;
Expand Down
23 changes: 22 additions & 1 deletion src/frontend/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,35 @@

use prost::Message;
use risingwave_common::telemetry::pb_compatible::TelemetryToProtobuf;
use risingwave_common::telemetry::report::TelemetryReportCreator;
use risingwave_common::telemetry::report::{report_event_common, TelemetryReportCreator};
use risingwave_common::telemetry::{
current_timestamp, SystemData, TelemetryNodeType, TelemetryReportBase, TelemetryResult,
};
use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
use serde::{Deserialize, Serialize};

const TELEMETRY_FRONTEND_REPORT_TYPE: &str = "frontend";

#[allow(dead_code)] // please remove when used
pub(crate) fn report_event(
event_stage: PbTelemetryEventStage,
event_name: &str,
catalog_id: i64,
connector_name: Option<String>,
component: Option<PbTelemetryDatabaseObject>,
attributes: Option<jsonbb::Value>, // any json string
) {
report_event_common(
event_stage,
event_name,
catalog_id,
connector_name,
component,
attributes,
TELEMETRY_FRONTEND_REPORT_TYPE.to_string(),
);
}

#[derive(Clone, Copy)]
pub(crate) struct FrontendTelemetryCreator {}

Expand Down
21 changes: 11 additions & 10 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
hex = "0.4"
hyper = "0.14" # required by tonic
itertools = { workspace = true }
jsonbb = { workspace = true }
maplit = "1.0.2"
memcomparable = { version = "0.2" }
mime_guess = "2"
Expand Down Expand Up @@ -71,12 +72,12 @@ sync-point = { path = "../utils/sync-point" }
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
"sync",
"macros",
"time",
"signal",
"rt",
"rt-multi-thread",
"sync",
"macros",
"time",
"signal",
] }
tokio-retry = "0.3"
tokio-stream = { version = "0.1", features = ["net"] }
Expand All @@ -89,10 +90,10 @@ uuid = { version = "1", features = ["v4"] }
[target.'cfg(not(madsim))'.dependencies]
axum = { workspace = true }
tower-http = { version = "0.5", features = [
"add-extension",
"cors",
"fs",
"compression-gzip",
"add-extension",
"cors",
"fs",
"compression-gzip",
] }
workspace-hack = { path = "../workspace-hack" }

Expand Down
Loading
Loading