Skip to content

Commit

Permalink
Update USER_END event structure
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko committed Nov 14, 2024
1 parent 757f520 commit 3836bdf
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 17 deletions.
40 changes: 35 additions & 5 deletions handlers/analytics/user_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,19 @@ type AnalyticsHandler struct {
}

type userEndData struct {
payload *misttriggers.UserEndPayload
UUID string `json:"uuid"`
TimestampMs int64 `json:"timestamp_ms"`
ConnectionToken string `json:"connection_token"`
DownloadedBytes string `json:"downloaded_bytes"`
UploadedBytes string `json:"uploaded_bytes"`
SessionDuration string `json:"session_duration_s"`
StreamID string `json:"stream_id"`
StreamIDCount int `json:"stream_id_count"`
Protocol string `json:"protocol"`
ProtocolCount int `json:"protocol_count"`
IPAddress string `json:"ip_address"`
IPAddressCount int `json:"ip_address_count"`
Tags string `json:"tags"`
}

func NewAnalyticsHandler(cli config.Cli, db *sql.DB) AnalyticsHandler {
Expand Down Expand Up @@ -56,7 +68,7 @@ func (a *AnalyticsHandler) HandleUserEnd(ctx context.Context, payload *misttrigg
if a.writer != nil {
// Using Kafka
select {
case a.dataCh <- userEndData{payload: payload}:
case a.dataCh <- toUserEndData(payload):
// process data async
default:
glog.Warningf("error processing USER_END trigger event, too many triggers in the buffer")
Expand Down Expand Up @@ -144,14 +156,14 @@ func (a *AnalyticsHandler) sendEvents() {

var msgs []kafka.Message
for _, d := range a.events {
key, err := json.Marshal(KafkaKey{SessionID: d.payload.SessionID})
key, err := json.Marshal(KafkaKey{SessionID: d.UUID})
if err != nil {
glog.Errorf("invalid USER_END event, cannot create Kafka key, sessionID=%s, err=%v", d.payload.SessionID, err)
glog.Errorf("invalid USER_END event, cannot create Kafka key, UUID=%s, err=%v", d.UUID, err)
continue
}
value, err := json.Marshal(d)
if err != nil {
glog.Errorf("invalid USER_END event, cannot create Kafka value, sessionID=%s, err=%v", d.payload.SessionID, err)
glog.Errorf("invalid USER_END event, cannot create Kafka value, UUID=%s, err=%v", d.UUID, err)
continue
}
msgs = append(msgs, kafka.Message{Key: key, Value: value})
Expand All @@ -160,3 +172,21 @@ func (a *AnalyticsHandler) sendEvents() {

sendWithRetries(a.writer, msgs)
}

func toUserEndData(payload *misttriggers.UserEndPayload) userEndData {
return userEndData{
UUID: payload.TriggerID,
TimestampMs: time.Now().UnixMilli(),
ConnectionToken: payload.ConnectionToken,
DownloadedBytes: payload.DownloadedBytes,
UploadedBytes: payload.UploadedBytes,
SessionDuration: payload.TimeActiveSecs,
StreamID: payload.StreamNames[len(payload.StreamNames)-1],
StreamIDCount: len(payload.StreamNames),
Protocol: payload.Protocols[len(payload.Protocols)-1],
ProtocolCount: len(payload.Protocols),
IPAddress: payload.IPs[len(payload.IPs)-1],
IPAddressCount: len(payload.IPs),
Tags: strings.Join(payload.Tags, ","),
}
}
24 changes: 12 additions & 12 deletions handlers/misttriggers/user_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ import (

// We only pass these on to the analytics pipeline, so leave as strings for now
type UserEndPayload struct {
TriggerID string
ConnectionToken string
StreamNames []string
IPs []string
TimeActiveSecs string
UploadedBytes string
DownloadedBytes string
Tags []string
PerIPSecs []string
PerProtocolSecs []string
PerStreamSecs []string
SessionID string
TriggerID string `json:"trigger_id"`
ConnectionToken string `json:"connection_token"`
StreamNames []string `json:"stream_names"`
IPs []string `json:"ips"`
TimeActiveSecs string `json:"time_active_secs"`
UploadedBytes string `json:"uploaded_bytes"`
DownloadedBytes string `json:"downloaded_bytes"`
Tags []string `json:"tags"`
PerIPSecs []string `json:"per_ip_secs"`
PerProtocolSecs []string `json:"per_protocol_secs"`
PerStreamSecs []string `json:"per_stream_secs"`
SessionID string `json:"session_id"`

/*
Protocols is a list of the protocols in use for the "user" session. Values can be (not exhaustive):
Expand Down

0 comments on commit 3836bdf

Please sign in to comment.