From d2d4f40fd334710dd397432f0b21a5ca93144162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Fri, 15 Nov 2024 10:18:23 +0100 Subject: [PATCH] Add sending USER_END events to Kafka (#1388) --- api/http.go | 20 ++- api/http_internal.go | 2 +- config/cli.go | 1 + handlers/analytics/kafka.go | 59 +++++++++ handlers/analytics/log_processor.go | 60 ++------- handlers/analytics/user_end.go | 188 ++++++++++++++++++++++------ handlers/misttriggers/user_end.go | 24 ++-- main.go | 1 + 8 files changed, 243 insertions(+), 112 deletions(-) create mode 100644 handlers/analytics/kafka.go diff --git a/api/http.go b/api/http.go index 15992a0ca..7cae2c624 100644 --- a/api/http.go +++ b/api/http.go @@ -5,7 +5,6 @@ import ( "net/http" "time" - "github.com/golang/glog" "github.com/julienschmidt/httprouter" "github.com/livepeer/catalyst-api/balancer" "github.com/livepeer/catalyst-api/config" @@ -64,17 +63,14 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck())) if cli.EnableAnalytics == "true" || cli.EnableAnalytics == "enabled" { - logProcessor, err := analytics.NewLogProcessor(cli.KafkaBootstrapServers, cli.KafkaUser, cli.KafkaPassword, cli.AnalyticsKafkaTopic) - if err != nil { - glog.Fatalf("failed to configure analytics log processor, err=%v", err) - } else { - analyticsApiHandlers := handlers.NewAnalyticsHandlersCollection(mapic, lapi, logProcessor) - router.POST("/analytics/log", withCORS(analyticsApiHandlers.Log())) - // Redirect GET /analytics/log to the specific catalyst node, e.g. "mdw-staging-staging-catalyst-0.livepeer.monster" - // This is useful for the player, because then it can stick to one node while sending analytics logs - router.GET("/analytics/log", withLogging(withCORS(geoHandlers.RedirectConstPathHandler()))) - router.HEAD("/analytics/log", withLogging(withCORS(geoHandlers.RedirectConstPathHandler()))) - } + logProcessor := analytics.NewLogProcessor(cli.KafkaBootstrapServers, cli.KafkaUser, cli.KafkaPassword, cli.AnalyticsKafkaTopic) + + analyticsApiHandlers := handlers.NewAnalyticsHandlersCollection(mapic, lapi, logProcessor) + router.POST("/analytics/log", withCORS(analyticsApiHandlers.Log())) + // Redirect GET /analytics/log to the specific catalyst node, e.g. "mdw-staging-staging-catalyst-0.livepeer.monster" + // This is useful for the player, because then it can stick to one node while sending analytics logs + router.GET("/analytics/log", withLogging(withCORS(geoHandlers.RedirectConstPathHandler()))) + router.HEAD("/analytics/log", withLogging(withCORS(geoHandlers.RedirectConstPathHandler()))) } // Playback endpoint diff --git a/api/http_internal.go b/api/http_internal.go index 30589e5bb..fece0f796 100644 --- a/api/http_internal.go +++ b/api/http_internal.go @@ -77,7 +77,7 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal, eventsEndpoint) ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine} accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic) - analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB) + analyticsHandlers := analytics.NewAnalyticsHandler(cli, metricsDB) encryptionHandlers := accesscontrol.NewEncryptionHandlersCollection(cli, spkiPublicKey) adminHandlers := &admin.AdminHandlersCollection{Cluster: c} mistCallbackHandlers := misttriggers.NewMistCallbackHandlersCollection(cli, broker) diff --git a/config/cli.go b/config/cli.go index 9cdc5e6eb..5c1d3a799 100644 --- a/config/cli.go +++ b/config/cli.go @@ -73,6 +73,7 @@ type Cli struct { KafkaUser string KafkaPassword string AnalyticsKafkaTopic string + UserEndKafkaTopic string SerfMembersEndpoint string EventsEndpoint string CatalystApiURL string diff --git a/handlers/analytics/kafka.go b/handlers/analytics/kafka.go new file mode 100644 index 000000000..912eeb3ca --- /dev/null +++ b/handlers/analytics/kafka.go @@ -0,0 +1,59 @@ +package analytics + +import ( + "context" + "crypto/tls" + + "github.com/golang/glog" + "github.com/livepeer/catalyst-api/metrics" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" +) + +func sendWithRetries(writer *kafka.Writer, msgs []kafka.Message) { + // We retry sending messages to Kafka in case of a failure + // We don't use any backoff, because the number of events are filling up very quickly, so in case of a failure + // it's better to lose events than fill up the memory and crash the whole catalyst-api + kafkaWriteRetries := 3 + var err error + for i := 0; i < kafkaWriteRetries; i++ { + err = writer.WriteMessages(context.Background(), msgs...) + if err == nil { + return + } else { + glog.Warningf("error while sending analytics log to Kafka, retrying, try=%d, err=%v", i, err) + } + } + metrics.Metrics.AnalyticsMetrics.LogProcessorWriteErrors.Inc() + glog.Errorf("error while sending events to Kafka, the events are lost, err=%d", err) +} + +func logWriteMetrics(writer *kafka.Writer) { + stats := writer.Stats() + metrics.Metrics.AnalyticsMetrics.KafkaWriteErrors.Add(float64(stats.Errors)) + metrics.Metrics.AnalyticsMetrics.KafkaWriteMessages.Add(float64(stats.Messages)) + metrics.Metrics.AnalyticsMetrics.KafkaWriteAvgTime.Observe(stats.WriteTime.Avg.Seconds()) + metrics.Metrics.AnalyticsMetrics.KafkaWriteRetries.Add(float64(stats.Retries)) +} + +func newWriter(bootstrapServers, user, password, topic string) *kafka.Writer { + dialer := &kafka.Dialer{ + Timeout: kafkaRequestTimeout, + SASLMechanism: plain.Mechanism{ + Username: user, + Password: password, + }, + DualStack: true, + TLS: &tls.Config{ + MinVersion: tls.VersionTLS12, + }, + } + + // Create a new Kafka writer + return kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{bootstrapServers}, + Topic: topic, + Balancer: kafka.CRC32Balancer{}, + Dialer: dialer, + }) +} diff --git a/handlers/analytics/log_processor.go b/handlers/analytics/log_processor.go index bf34c4301..993f36bc8 100644 --- a/handlers/analytics/log_processor.go +++ b/handlers/analytics/log_processor.go @@ -1,20 +1,17 @@ package analytics import ( - "context" - "crypto/tls" "encoding/json" "time" "github.com/golang/glog" "github.com/livepeer/catalyst-api/metrics" "github.com/segmentio/kafka-go" - "github.com/segmentio/kafka-go/sasl/plain" ) const ( - KafkaBatchInterval = 1 * time.Second - KafkaRequestTimeout = 60 * time.Second + kafkaBatchInterval = 1 * time.Second + kafkaRequestTimeout = 60 * time.Second ) type ILogProcessor interface { @@ -95,37 +92,18 @@ type KafkaKey struct { EventType string `json:"event_type"` } -func NewLogProcessor(bootstrapServers, user, password, topic string) (*LogProcessor, error) { - dialer := &kafka.Dialer{ - Timeout: KafkaRequestTimeout, - SASLMechanism: plain.Mechanism{ - Username: user, - Password: password, - }, - DualStack: true, - TLS: &tls.Config{ - MinVersion: tls.VersionTLS12, - }, - } - - // Create a new Kafka writer - writer := kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{bootstrapServers}, - Topic: topic, - Balancer: kafka.CRC32Balancer{}, - Dialer: dialer, - }) - +func NewLogProcessor(bootstrapServers, user, password, topic string) *LogProcessor { + writer := newWriter(bootstrapServers, user, password, topic) return &LogProcessor{ logs: []LogData{}, writer: writer, topic: topic, - }, nil + } } // Start starts LogProcessor which sends events to Kafka in batches. func (lp *LogProcessor) Start(ch chan LogData) { - t := time.NewTicker(KafkaBatchInterval) + t := time.NewTicker(kafkaBatchInterval) go func() { for { select { @@ -156,7 +134,7 @@ func updateMetrics(d LogData) { } func (p *LogProcessor) sendEvents() { - defer p.logWriteMetrics() + defer logWriteMetrics(p.writer) if len(p.logs) > 0 { glog.Infof("sending analytics logs, count=%d", len(p.logs)) @@ -184,27 +162,5 @@ func (p *LogProcessor) sendEvents() { } p.logs = []LogData{} - // We retry sending messages to Kafka in case of a failure - // We don't use any backoff, because the number of events are filling up very quickly, so in case of a failure - // it's better to lose analytics logs than fill up the memory and crash the whole catalyst-api - kafkaWriteRetries := 3 - var err error - for i := 0; i < kafkaWriteRetries; i++ { - err = p.writer.WriteMessages(context.Background(), msgs...) - if err == nil { - return - } else { - glog.Warningf("error while sending analytics log to Kafka, retrying, try=%d, err=%v", i, err) - } - } - metrics.Metrics.AnalyticsMetrics.LogProcessorWriteErrors.Inc() - glog.Errorf("error while sending analytics log to Kafka, the analytics logs are lost, err=%d", err) -} - -func (p *LogProcessor) logWriteMetrics() { - stats := p.writer.Stats() - metrics.Metrics.AnalyticsMetrics.KafkaWriteErrors.Add(float64(stats.Errors)) - metrics.Metrics.AnalyticsMetrics.KafkaWriteMessages.Add(float64(stats.Messages)) - metrics.Metrics.AnalyticsMetrics.KafkaWriteAvgTime.Observe(stats.WriteTime.Avg.Seconds()) - metrics.Metrics.AnalyticsMetrics.KafkaWriteRetries.Add(float64(stats.Retries)) + sendWithRetries(p.writer, msgs) } diff --git a/handlers/analytics/user_end.go b/handlers/analytics/user_end.go index 648c6d465..949ff9a4b 100644 --- a/handlers/analytics/user_end.go +++ b/handlers/analytics/user_end.go @@ -3,6 +3,9 @@ package analytics import ( "context" "database/sql" + "encoding/json" + "github.com/livepeer/catalyst-api/config" + "github.com/segmentio/kafka-go" "strings" "time" @@ -10,30 +13,79 @@ import ( "github.com/livepeer/catalyst-api/handlers/misttriggers" ) -const USER_END_TABLE_NAME = "user_end_trigger" +const ( + userEndTableName = "user_end_trigger" + channelBufferSize = 200000 + sendInterval = 1 * time.Second +) type AnalyticsHandler struct { - db *sql.DB + db *sql.DB + dataCh chan userEndData + events []userEndData + writer *kafka.Writer +} + +type userEndData struct { + UUID string `json:"uuid"` + TimestampMs int64 `json:"timestamp_ms"` + ConnectionToken string `json:"connection_token"` + DownloadedBytes string `json:"downloaded_bytes"` + UploadedBytes string `json:"uploaded_bytes"` + SessionDuration string `json:"session_duration_s"` + StreamID string `json:"stream_id"` + StreamIDCount int `json:"stream_id_count"` + Protocol string `json:"protocol"` + ProtocolCount int `json:"protocol_count"` + IPAddress string `json:"ip_address"` + IPAddressCount int `json:"ip_address_count"` + Tags string `json:"tags"` } -func NewAnalyticsHandler(db *sql.DB) AnalyticsHandler { - return AnalyticsHandler{db: db} +func NewAnalyticsHandler(cli config.Cli, db *sql.DB) AnalyticsHandler { + var writer *kafka.Writer + if cli.KafkaBootstrapServers == "" || cli.KafkaUser == "" || cli.KafkaPassword == "" || cli.UserEndKafkaTopic == "" { + glog.Warning("Invalid Kafka configuration for USER_END events, not using Kafka") + } else { + writer = newWriter(cli.KafkaBootstrapServers, cli.KafkaUser, cli.KafkaPassword, cli.UserEndKafkaTopic) + } + + a := AnalyticsHandler{ + // Deprecated, we'll remove it when the Kafka setup is all in place + db: db, + + // User to send USER_END events to Kafka + dataCh: make(chan userEndData, channelBufferSize), + writer: writer, + } + + a.startLoop() + return a + } func (a *AnalyticsHandler) HandleUserEnd(ctx context.Context, payload *misttriggers.UserEndPayload) error { - // If there's nowhere to write to, this handler is a no-op - if a.db == nil { - return nil + if a.writer != nil { + // Using Kafka + select { + case a.dataCh <- toUserEndData(payload): + // process data async + default: + glog.Warningf("error processing USER_END trigger event, too many triggers in the buffer") + } } - // No need to block our response to Mist; everything else in a goroutine - go func() { - defer func() { - if rec := recover(); rec != nil { - glog.Errorf("panic writing to analytics database err=%s payload=%v", rec, payload) - } - }() - insertDynStmt := `insert into "` + USER_END_TABLE_NAME + `"( + if a.db != nil { + // Using Postgres DB + + // No need to block our response to Mist; everything else in a goroutine + go func() { + defer func() { + if rec := recover(); rec != nil { + glog.Errorf("panic writing to analytics database err=%s payload=%v", rec, payload) + } + }() + insertDynStmt := `insert into "` + userEndTableName + `"( "uuid", "timestamp_ms", "connection_token", @@ -48,27 +100,93 @@ func (a *AnalyticsHandler) HandleUserEnd(ctx context.Context, payload *misttrigg "ip_address_count", "tags" ) values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)` - _, err := a.db.Exec( - insertDynStmt, - payload.TriggerID, // uuid - time.Now().UnixMilli(), // timestamp_ms - payload.ConnectionToken, // connection_token - payload.DownloadedBytes, // downloaded_bytes - payload.UploadedBytes, // uploaded_bytes - payload.TimeActiveSecs, // session_duration_s - payload.StreamNames[len(payload.StreamNames)-1], // stream_id - len(payload.StreamNames), // stream_id_count - payload.Protocols[len(payload.Protocols)-1], // protocol - len(payload.Protocols), // protocol_count - payload.IPs[len(payload.IPs)-1], // ip_address - len(payload.IPs), // ip_address_count - strings.Join(payload.Tags, ","), // tags - - ) - if err != nil { - glog.Errorf("error writing to analytics database err=%s payload=%v", err, payload) + _, err := a.db.Exec( + insertDynStmt, + payload.TriggerID, // uuid + time.Now().UnixMilli(), // timestamp_ms + payload.ConnectionToken, // connection_token + payload.DownloadedBytes, // downloaded_bytes + payload.UploadedBytes, // uploaded_bytes + payload.TimeActiveSecs, // session_duration_s + payload.StreamNames[len(payload.StreamNames)-1], // stream_id + len(payload.StreamNames), // stream_id_count + payload.Protocols[len(payload.Protocols)-1], // protocol + len(payload.Protocols), // protocol_count + payload.IPs[len(payload.IPs)-1], // ip_address + len(payload.IPs), // ip_address_count + strings.Join(payload.Tags, ","), // tags + ) + if err != nil { + glog.Errorf("error writing to analytics database err=%s payload=%v", err, payload) + } + }() + } + + return nil +} + +func (a *AnalyticsHandler) startLoop() { + if a.writer == nil { + // Not using Kafka + return + } + + t := time.NewTicker(sendInterval) + go func() { + for { + select { + case d := <-a.dataCh: + a.events = append(a.events, d) + case <-t.C: + a.sendEvents() + } } }() +} - return nil +func (a *AnalyticsHandler) sendEvents() { + defer logWriteMetrics(a.writer) + + if len(a.events) > 0 { + glog.Infof("sending USER_END events, count=%d", len(a.events)) + } else { + glog.V(6).Info("no USER_END events, skip sending") + return + } + + var msgs []kafka.Message + for _, d := range a.events { + key, err := json.Marshal(KafkaKey{SessionID: d.UUID}) + if err != nil { + glog.Errorf("invalid USER_END event, cannot create Kafka key, UUID=%s, err=%v", d.UUID, err) + continue + } + value, err := json.Marshal(d) + if err != nil { + glog.Errorf("invalid USER_END event, cannot create Kafka value, UUID=%s, err=%v", d.UUID, err) + continue + } + msgs = append(msgs, kafka.Message{Key: key, Value: value}) + } + a.events = []userEndData{} + + sendWithRetries(a.writer, msgs) +} + +func toUserEndData(payload *misttriggers.UserEndPayload) userEndData { + return userEndData{ + UUID: payload.TriggerID, + TimestampMs: time.Now().UnixMilli(), + ConnectionToken: payload.ConnectionToken, + DownloadedBytes: payload.DownloadedBytes, + UploadedBytes: payload.UploadedBytes, + SessionDuration: payload.TimeActiveSecs, + StreamID: payload.StreamNames[len(payload.StreamNames)-1], + StreamIDCount: len(payload.StreamNames), + Protocol: payload.Protocols[len(payload.Protocols)-1], + ProtocolCount: len(payload.Protocols), + IPAddress: payload.IPs[len(payload.IPs)-1], + IPAddressCount: len(payload.IPs), + Tags: strings.Join(payload.Tags, ","), + } } diff --git a/handlers/misttriggers/user_end.go b/handlers/misttriggers/user_end.go index f6a59a1cf..bfcbcebf1 100644 --- a/handlers/misttriggers/user_end.go +++ b/handlers/misttriggers/user_end.go @@ -11,18 +11,18 @@ import ( // We only pass these on to the analytics pipeline, so leave as strings for now type UserEndPayload struct { - TriggerID string - ConnectionToken string - StreamNames []string - IPs []string - TimeActiveSecs string - UploadedBytes string - DownloadedBytes string - Tags []string - PerIPSecs []string - PerProtocolSecs []string - PerStreamSecs []string - SessionID string + TriggerID string `json:"trigger_id"` + ConnectionToken string `json:"connection_token"` + StreamNames []string `json:"stream_names"` + IPs []string `json:"ips"` + TimeActiveSecs string `json:"time_active_secs"` + UploadedBytes string `json:"uploaded_bytes"` + DownloadedBytes string `json:"downloaded_bytes"` + Tags []string `json:"tags"` + PerIPSecs []string `json:"per_ip_secs"` + PerProtocolSecs []string `json:"per_protocol_secs"` + PerStreamSecs []string `json:"per_stream_secs"` + SessionID string `json:"session_id"` /* Protocols is a list of the protocols in use for the "user" session. Values can be (not exhaustive): diff --git a/main.go b/main.go index 52c0b80db..9bd2b5869 100644 --- a/main.go +++ b/main.go @@ -132,6 +132,7 @@ func main() { fs.StringVar(&cli.KafkaUser, "kafka-user", "", "Kafka Username") fs.StringVar(&cli.KafkaPassword, "kafka-password", "", "Kafka Password") fs.StringVar(&cli.AnalyticsKafkaTopic, "analytics-kafka-topic", "", "Kafka Topic used to send analytics logs") + fs.StringVar(&cli.UserEndKafkaTopic, "user-end-kafka-topic", "", "Kafka Topic used to send USER_END events") fs.StringVar(&cli.SerfMembersEndpoint, "serf-members-endpoint", "", "Endpoint to get the current members in the cluster") fs.StringVar(&cli.EventsEndpoint, "events-endpoint", "", "Endpoint to send proxied events from catalyst-api into catalyst") fs.StringVar(&cli.CatalystApiURL, "catalyst-api-url", "", "Endpoint for externally deployed catalyst-api; if not set, use local catalyst-api")