From 3836bdf88e3c21617d17835eb77206d4cc50c129 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Thu, 14 Nov 2024 19:16:35 +0100 Subject: [PATCH] Update USER_END event structure --- handlers/analytics/user_end.go | 40 +++++++++++++++++++++++++++---- handlers/misttriggers/user_end.go | 24 +++++++++---------- 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/handlers/analytics/user_end.go b/handlers/analytics/user_end.go index 8fae5e31b..949ff9a4b 100644 --- a/handlers/analytics/user_end.go +++ b/handlers/analytics/user_end.go @@ -27,7 +27,19 @@ type AnalyticsHandler struct { } type userEndData struct { - payload *misttriggers.UserEndPayload + UUID string `json:"uuid"` + TimestampMs int64 `json:"timestamp_ms"` + ConnectionToken string `json:"connection_token"` + DownloadedBytes string `json:"downloaded_bytes"` + UploadedBytes string `json:"uploaded_bytes"` + SessionDuration string `json:"session_duration_s"` + StreamID string `json:"stream_id"` + StreamIDCount int `json:"stream_id_count"` + Protocol string `json:"protocol"` + ProtocolCount int `json:"protocol_count"` + IPAddress string `json:"ip_address"` + IPAddressCount int `json:"ip_address_count"` + Tags string `json:"tags"` } func NewAnalyticsHandler(cli config.Cli, db *sql.DB) AnalyticsHandler { @@ -56,7 +68,7 @@ func (a *AnalyticsHandler) HandleUserEnd(ctx context.Context, payload *misttrigg if a.writer != nil { // Using Kafka select { - case a.dataCh <- userEndData{payload: payload}: + case a.dataCh <- toUserEndData(payload): // process data async default: glog.Warningf("error processing USER_END trigger event, too many triggers in the buffer") @@ -144,14 +156,14 @@ func (a *AnalyticsHandler) sendEvents() { var msgs []kafka.Message for _, d := range a.events { - key, err := json.Marshal(KafkaKey{SessionID: d.payload.SessionID}) + key, err := json.Marshal(KafkaKey{SessionID: d.UUID}) if err != nil { - glog.Errorf("invalid USER_END event, cannot create Kafka key, sessionID=%s, err=%v", d.payload.SessionID, err) + glog.Errorf("invalid USER_END event, cannot create Kafka key, UUID=%s, err=%v", d.UUID, err) continue } value, err := json.Marshal(d) if err != nil { - glog.Errorf("invalid USER_END event, cannot create Kafka value, sessionID=%s, err=%v", d.payload.SessionID, err) + glog.Errorf("invalid USER_END event, cannot create Kafka value, UUID=%s, err=%v", d.UUID, err) continue } msgs = append(msgs, kafka.Message{Key: key, Value: value}) @@ -160,3 +172,21 @@ func (a *AnalyticsHandler) sendEvents() { sendWithRetries(a.writer, msgs) } + +func toUserEndData(payload *misttriggers.UserEndPayload) userEndData { + return userEndData{ + UUID: payload.TriggerID, + TimestampMs: time.Now().UnixMilli(), + ConnectionToken: payload.ConnectionToken, + DownloadedBytes: payload.DownloadedBytes, + UploadedBytes: payload.UploadedBytes, + SessionDuration: payload.TimeActiveSecs, + StreamID: payload.StreamNames[len(payload.StreamNames)-1], + StreamIDCount: len(payload.StreamNames), + Protocol: payload.Protocols[len(payload.Protocols)-1], + ProtocolCount: len(payload.Protocols), + IPAddress: payload.IPs[len(payload.IPs)-1], + IPAddressCount: len(payload.IPs), + Tags: strings.Join(payload.Tags, ","), + } +} diff --git a/handlers/misttriggers/user_end.go b/handlers/misttriggers/user_end.go index f6a59a1cf..bfcbcebf1 100644 --- a/handlers/misttriggers/user_end.go +++ b/handlers/misttriggers/user_end.go @@ -11,18 +11,18 @@ import ( // We only pass these on to the analytics pipeline, so leave as strings for now type UserEndPayload struct { - TriggerID string - ConnectionToken string - StreamNames []string - IPs []string - TimeActiveSecs string - UploadedBytes string - DownloadedBytes string - Tags []string - PerIPSecs []string - PerProtocolSecs []string - PerStreamSecs []string - SessionID string + TriggerID string `json:"trigger_id"` + ConnectionToken string `json:"connection_token"` + StreamNames []string `json:"stream_names"` + IPs []string `json:"ips"` + TimeActiveSecs string `json:"time_active_secs"` + UploadedBytes string `json:"uploaded_bytes"` + DownloadedBytes string `json:"downloaded_bytes"` + Tags []string `json:"tags"` + PerIPSecs []string `json:"per_ip_secs"` + PerProtocolSecs []string `json:"per_protocol_secs"` + PerStreamSecs []string `json:"per_stream_secs"` + SessionID string `json:"session_id"` /* Protocols is a list of the protocols in use for the "user" session. Values can be (not exhaustive):