Skip to content

Commit

Permalink
Add event source id to all events. Fixes #2619
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Jan 14, 2025
1 parent a33dd4e commit 71c7f6b
Show file tree
Hide file tree
Showing 31 changed files with 75 additions and 32 deletions.
1 change: 1 addition & 0 deletions controller/event/api_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const ApiSessionTypeJwt = "jwt"
type ApiSessionEvent struct {
Namespace string `json:"namespace"`
EventType string `json:"event_type"`
EventSrcId string `json:"event_src_id"`
Id string `json:"id"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Expand Down
1 change: 1 addition & 0 deletions controller/event/circuits.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type CircuitEvent struct {
Namespace string `json:"namespace"`
Version uint32 `json:"version"`
EventType CircuitEventType `json:"event_type"`
EventSrcId string `json:"event_src_id"`
CircuitId string `json:"circuit_id"`
Timestamp time.Time `json:"timestamp"`
ClientId string `json:"client_id"`
Expand Down
13 changes: 7 additions & 6 deletions controller/event/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ func (self *ClusterPeer) String() string {
}

type ClusterEvent struct {
Namespace string `json:"namespace"`
EventType ClusterEventType `json:"eventType"`
Timestamp time.Time `json:"timestamp"`
Index uint64 `json:"index,omitempty"`
Peers []*ClusterPeer `json:"peers,omitempty"`
LeaderId string `json:"leaderId,omitempty"`
Namespace string `json:"namespace"`
EventType ClusterEventType `json:"eventType"`
EventSrcId string `json:"event_src_id"`
Timestamp time.Time `json:"timestamp"`
Index uint64 `json:"index,omitempty"`
Peers []*ClusterPeer `json:"peers,omitempty"`
LeaderId string `json:"leaderId,omitempty"`
}

func (event *ClusterEvent) String() string {
Expand Down
17 changes: 9 additions & 8 deletions controller/event/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ const (
)

type ConnectEvent struct {
Namespace string `json:"namespace"`
SrcType ConnectSource `json:"src_type"`
DstType ConnectDestination `json:"dst_type"`
SrcId string `json:"src_id"`
SrcAddr string `json:"src_addr"`
DstId string `json:"dst_id"`
DstAddr string `json:"dst_addr"`
Timestamp time.Time `json:"timestamp"`
Namespace string `json:"namespace"`
EventSrcId string `json:"event_src_id"`
SrcType ConnectSource `json:"src_type"`
DstType ConnectDestination `json:"dst_type"`
SrcId string `json:"src_id"`
SrcAddr string `json:"src_addr"`
DstId string `json:"dst_id"`
DstAddr string `json:"dst_addr"`
Timestamp time.Time `json:"timestamp"`
}

type ConnectEventHandler interface {
Expand Down
1 change: 1 addition & 0 deletions controller/event/entity_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type EntityChangeEvent struct {
Namespace string `json:"namespace"`
EventId string `json:"eventId"`
EventType EntityChangeEventType `json:"eventType"`
EventSrcId string `json:"event_src_id"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]any `json:"metadata,omitempty"`
EntityType string `json:"entityType,omitempty"`
Expand Down
9 changes: 5 additions & 4 deletions controller/event/entity_counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
const EntityCountEventNS = "edge.entityCounts"

type EntityCountEvent struct {
Namespace string `json:"namespace"`
Timestamp time.Time `json:"timestamp"`
Counts map[string]int64 `json:"counts"`
Error string `json:"error"`
Namespace string `json:"namespace"`
EventSrcId string `json:"event_src_id"`
Timestamp time.Time `json:"timestamp"`
Counts map[string]int64 `json:"counts"`
Error string `json:"error"`
}

func (event *EntityCountEvent) String() string {
Expand Down
1 change: 1 addition & 0 deletions controller/event/links.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type LinkConnection struct {
type LinkEvent struct {
Namespace string `json:"namespace"`
EventType LinkEventType `json:"event_type"`
EventSrcId string `json:"event_src_id"`
Timestamp time.Time `json:"timestamp"`
LinkId string `json:"link_id"`
SrcRouterId string `json:"src_router_id"`
Expand Down
1 change: 1 addition & 0 deletions controller/event/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
type MetricsEvent struct {
MetricType string `json:"metric_type" mapstructure:"metric_type"`
Namespace string `json:"namespace"`
EventSrcId string `json:"event_src_id"`
SourceAppId string `json:"source_id" mapstructure:"source_id"`
SourceEntityId string `json:"source_entity_id,omitempty" mapstructure:"source_entity_id,omitempty"`
Version uint32 `json:"version"`
Expand Down
1 change: 1 addition & 0 deletions controller/event/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
type RouterEvent struct {
Namespace string `json:"namespace"`
EventType RouterEventType `json:"event_type"`
EventSrcId string `json:"event_src_id"`
Timestamp time.Time `json:"timestamp"`
RouterId string `json:"router_id"`
RouterOnline bool `json:"router_online"`
Expand Down
1 change: 1 addition & 0 deletions controller/event/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
type SdkEvent struct {
Namespace string `json:"namespace"`
EventType SdkEventType `json:"event_type"`
EventSrcId string `json:"event_src_id"`
Timestamp time.Time `json:"timestamp"`
IdentityId string `json:"identity_id"`
}
Expand Down
1 change: 1 addition & 0 deletions controller/event/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ServiceEvent struct {
Namespace string `json:"namespace"`
Version uint32 `json:"version"`
EventType string `json:"event_type"`
EventSrcId string `json:"event_src_id"`
ServiceId string `json:"service_id"`
TerminatorId string `json:"terminator_id"`
Count uint64 `json:"count"`
Expand Down
1 change: 1 addition & 0 deletions controller/event/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const SessionEventNS = "edge.sessions"
type SessionEvent struct {
Namespace string `json:"namespace"`
EventType string `json:"event_type"`
EventSrcId string `json:"event_src_id"`
SessionType string `json:"session_type"`
Id string `json:"id"`
Timestamp time.Time `json:"timestamp"`
Expand Down
1 change: 1 addition & 0 deletions controller/event/terminators.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
type TerminatorEvent struct {
Namespace string `json:"namespace"`
EventType TerminatorEventType `json:"event_type"`
EventSrcId string `json:"event_src_id"`
Timestamp time.Time `json:"timestamp"`
ServiceId string `json:"service_id"`
TerminatorId string `json:"terminator_id"`
Expand Down
2 changes: 2 additions & 0 deletions controller/event/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type UsageEvent struct {
Namespace string `json:"namespace"`
Version uint32 `json:"version"`
EventType string `json:"event_type"`
EventSrcId string `json:"event_src_id"`
SourceId string `json:"source_id"`
CircuitId string `json:"circuit_id"`
Usage uint64 `json:"usage"`
Expand All @@ -31,6 +32,7 @@ type UsageEventHandler interface {
type UsageEventV3 struct {
Namespace string `json:"namespace"`
Version uint32 `json:"version"`
EventSrcId string `json:"event_src_id"`
SourceId string `json:"source_id"`
CircuitId string `json:"circuit_id"`
Usage map[string]uint64 `json:"usage"`
Expand Down
2 changes: 2 additions & 0 deletions controller/events/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func NewDispatcher(closeNotify <-chan struct{}) *Dispatcher {
var _ event.Dispatcher = (*Dispatcher)(nil)

type Dispatcher struct {
ctrlId string
circuitEventHandlers concurrenz.CopyOnWriteSlice[event.CircuitEventHandler]
entityChangeEventHandlers concurrenz.CopyOnWriteSlice[event.EntityChangeEventHandler]
linkEventHandlers concurrenz.CopyOnWriteSlice[event.LinkEventHandler]
Expand Down Expand Up @@ -118,6 +119,7 @@ type Dispatcher struct {

func (self *Dispatcher) InitializeNetworkEvents(n *network.Network) {
self.network = n
self.ctrlId = n.GetAppId()
self.initMetricsEvents(n)
self.initRouterEvents(n)
self.initServiceEvents(n)
Expand Down
2 changes: 2 additions & 0 deletions controller/events/dispatcher_api_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (self *Dispatcher) apiSessionCreated(apiSession *db.ApiSession) {
evt := &event.ApiSessionEvent{
Namespace: event.ApiSessionEventNS,
EventType: event.ApiSessionEventTypeCreated,
EventSrcId: self.ctrlId,
Id: apiSession.Id,
Type: event.ApiSessionTypeLegacy,
Timestamp: time.Now(),
Expand All @@ -73,6 +74,7 @@ func (self *Dispatcher) apiSessionDeleted(apiSession *db.ApiSession) {
evt := &event.ApiSessionEvent{
Namespace: event.ApiSessionEventNS,
EventType: event.ApiSessionEventTypeDeleted,
EventSrcId: self.ctrlId,
Id: apiSession.Id,
Type: event.ApiSessionTypeLegacy,
Timestamp: time.Now(),
Expand Down
1 change: 1 addition & 0 deletions controller/events/dispatcher_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (self *Dispatcher) RemoveClusterEventHandler(handler event.ClusterEventHand
}

func (self *Dispatcher) AcceptClusterEvent(event *event.ClusterEvent) {
event.EventSrcId = self.ctrlId
go func() {
for _, handler := range self.clusterEventHandlers.Value() {
handler.AcceptClusterEvent(event)
Expand Down
1 change: 1 addition & 0 deletions controller/events/dispatcher_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (self *Dispatcher) RemoveConnectEventHandler(handler event.ConnectEventHand
}

func (self *Dispatcher) AcceptConnectEvent(evt *event.ConnectEvent) {
evt.EventSrcId = self.ctrlId
for _, handler := range self.connectEventHandlers.Value() {
go handler.AcceptConnectEvent(evt)
}
Expand Down
5 changes: 4 additions & 1 deletion controller/events/dispatcher_entity_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func (self *entityChangeEventDispatcher) ProcessPreCommit(state boltz.UntypedEnt
Namespace: event.EntityChangeEventsNs,
EventId: state.GetEventId(),
EventType: changeType,
EventSrcId: self.dispatcher.ctrlId,
EntityType: state.GetStore().GetEntityType(),
IsParentEvent: &isParentEvent,
Timestamp: time.Now(),
Expand Down Expand Up @@ -265,9 +266,10 @@ func (self *entityChangeEventDispatcher) ProcessPreCommit(state boltz.UntypedEnt
func (self *entityChangeEventDispatcher) emitRecoveryEvent(eventId string, entityType string) {
evt := &event.EntityChangeEvent{
Namespace: event.EntityChangeEventsNs,
EventType: event.EntityChangeTypeCommitted,
EventSrcId: self.dispatcher.ctrlId,
EventId: eventId,
EntityType: entityType,
EventType: event.EntityChangeTypeCommitted,
Timestamp: time.Now(),
IsRecoveryEvent: true,
}
Expand All @@ -280,6 +282,7 @@ func (self *entityChangeEventDispatcher) ProcessPostCommit(state boltz.UntypedEn
Namespace: event.EntityChangeEventsNs,
EventId: state.GetEventId(),
EventType: event.EntityChangeTypeCommitted,
EventSrcId: self.dispatcher.ctrlId,
EntityType: state.GetStore().GetEntityType(),
Timestamp: time.Now(),
IsParentEvent: &isParentEvent,
Expand Down
5 changes: 3 additions & 2 deletions controller/events/dispatcher_entity_counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func (self *Dispatcher) generateEntityEvents() {

func (self *Dispatcher) generateEntityCountEvent() *event.EntityCountEvent {
event := &event.EntityCountEvent{
Namespace: event.EntityCountEventNS,
Timestamp: time.Now(),
Namespace: event.EntityCountEventNS,
EventSrcId: self.ctrlId,
Timestamp: time.Now(),
}

data, err := self.stores.GetEntityCounts(self.network.GetDb())
Expand Down
3 changes: 2 additions & 1 deletion controller/events/dispatcher_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ func (self *Dispatcher) unregisterMetricsEventHandler(val interface{}) {

func (self *Dispatcher) newMetricEvent(msg *metrics_pb.MetricsMessage, metricType string, name string, id string) *event.MetricsEvent {
result := &event.MetricsEvent{
MetricType: metricType,
Namespace: event.MetricsEventsNs,
EventSrcId: self.ctrlId,
MetricType: metricType,
SourceAppId: msg.SourceId,
Timestamp: msg.Timestamp.AsTime(),
Metric: name,
Expand Down
3 changes: 2 additions & 1 deletion controller/events/dispatcher_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (self *routerEventAdapter) RouterDisconnected(r *model.Router) {
func (self *routerEventAdapter) routerChange(eventType event.RouterEventType, r *model.Router, online bool) {
evt := &event.RouterEvent{
Namespace: event.RouterEventsNs,
EventSrcId: self.ctrlId,
EventType: eventType,
Timestamp: time.Now(),
RouterId: r.Id,
Expand All @@ -104,7 +105,7 @@ func (self *routerEventAdapter) routerChange(eventType event.RouterEventType, r
DstType: event.ConnectDestinationController,
SrcId: r.Id,
SrcAddr: srcAddr,
DstId: self.Dispatcher.network.GetAppId(),
DstId: self.Dispatcher.ctrlId,
DstAddr: dstAddr,
Timestamp: time.Now(),
}
Expand Down
1 change: 1 addition & 0 deletions controller/events/dispatcher_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (self *Dispatcher) RemoveSdkEventHandler(handler event.SdkEventHandler) {
}

func (self *Dispatcher) AcceptSdkEvent(evt *event.SdkEvent) {
evt.EventSrcId = self.ctrlId
for _, handler := range self.sdkEventHandlers.Value() {
go handler.AcceptSdkEvent(evt)
}
Expand Down
1 change: 1 addition & 0 deletions controller/events/dispatcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (self *serviceEventAdapter) AcceptMetrics(message *metrics_pb.MetricsMessag
Namespace: "service.events",
Version: 2,
EventType: name,
EventSrcId: self.ctrlId,
ServiceId: serviceId,
TerminatorId: terminatorId,
Count: count,
Expand Down
2 changes: 2 additions & 0 deletions controller/events/dispatcher_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (self *Dispatcher) sessionCreated(session *db.Session) {
evt := &event.SessionEvent{
Namespace: event.SessionEventNS,
EventType: event.SessionEventTypeCreated,
EventSrcId: self.ctrlId,
Id: session.Id,
SessionType: session.Type,
Timestamp: time.Now(),
Expand All @@ -70,6 +71,7 @@ func (self *Dispatcher) sessionDeleted(session *db.Session) {
evt := &event.SessionEvent{
Namespace: event.SessionEventNS,
EventType: event.SessionEventTypeDeleted,
EventSrcId: self.ctrlId,
Id: session.Id,
SessionType: session.Type,
Timestamp: time.Now(),
Expand Down
1 change: 1 addition & 0 deletions controller/events/dispatcher_terminator.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (self *terminatorEventAdapter) createTerminatorEvent(eventType event.Termin
evt := &event.TerminatorEvent{
Namespace: event.TerminatorEventsNs,
EventType: eventType,
EventSrcId: self.Dispatcher.ctrlId,
Timestamp: time.Now(),
ServiceId: terminator.Service,
TerminatorId: terminator.Id,
Expand Down
14 changes: 9 additions & 5 deletions controller/events/dispatcher_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (self *usageEventAdapter) AcceptMetricsMsg(message *metrics_pb.MetricsMessa
evt := &event.UsageEvent{
Namespace: event.UsageEventsNs,
Version: event.UsageEventsVersion,
EventSrcId: self.dispatcher.ctrlId,
EventType: name,
SourceId: message.SourceId,
CircuitId: circuitId,
Expand All @@ -169,8 +170,9 @@ func (self *usageEventAdapter) AcceptMetricsMsg(message *metrics_pb.MetricsMessa
for usageType, usage := range bucket.Values {
evt := &event.UsageEvent{
Namespace: event.UsageEventsNs,
Version: 2,
Version: event.UsageEventsVersion,
EventType: "usage." + usageType,
EventSrcId: self.dispatcher.ctrlId,
SourceId: message.SourceId,
CircuitId: circuitId,
Usage: usage,
Expand All @@ -189,10 +191,11 @@ func (self *usageEventAdapter) AcceptMetricsMsg(message *metrics_pb.MetricsMessa
for _, bucket := range interval.Buckets {
for circuitId, usage := range bucket.Values {
evt := &event.UsageEventV3{
Namespace: event.UsageEventsNs,
Version: 3,
SourceId: message.SourceId,
CircuitId: circuitId,
Namespace: event.UsageEventsNs,
Version: 3,
EventSrcId: self.dispatcher.ctrlId,
SourceId: message.SourceId,
CircuitId: circuitId,
Usage: map[string]uint64{
name: usage,
},
Expand All @@ -210,6 +213,7 @@ func (self *usageEventAdapter) AcceptMetricsMsg(message *metrics_pb.MetricsMessa
Namespace: event.UsageEventsNs,
Version: 3,
SourceId: message.SourceId,
EventSrcId: self.dispatcher.ctrlId,
CircuitId: circuitId,
Usage: bucket.Values,
IntervalStartUTC: interval.IntervalStartUTC,
Expand Down
1 change: 1 addition & 0 deletions controller/events/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func Test_FilterMetrics(t *testing.T) {
closeNotify := make(chan struct{})
defer close(closeNotify)
dispatcher := NewDispatcher(closeNotify)
dispatcher.ctrlId = "ctrl1"

unfilteredEventC := make(chan *event.MetricsEvent, 1)
adapter := dispatcher.NewFilteredMetricsAdapter(nil, nil, event.MetricsEventHandlerF(func(evt *event.MetricsEvent) {
Expand Down
11 changes: 7 additions & 4 deletions controller/network/assembly.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (network *Network) NotifyLinkEvent(link *model.Link, eventType event.LinkEv
linkEvent := &event.LinkEvent{
Namespace: event.LinkEventsNs,
EventType: eventType,
EventSrcId: network.GetAppId(),
Timestamp: time.Now(),
LinkId: link.Id,
SrcRouterId: link.Src.Id,
Expand All @@ -92,6 +93,7 @@ func (network *Network) NotifyLinkConnected(link *model.Link, msg *ctrl_pb.LinkC
linkEvent := &event.LinkEvent{
Namespace: event.LinkEventsNs,
EventType: event.LinkConnected,
EventSrcId: network.GetAppId(),
Timestamp: time.Now(),
LinkId: link.Id,
SrcRouterId: link.Src.Id,
Expand All @@ -114,10 +116,11 @@ func (network *Network) NotifyLinkConnected(link *model.Link, msg *ctrl_pb.LinkC

func (network *Network) NotifyLinkIdEvent(linkId string, eventType event.LinkEventType) {
linkEvent := &event.LinkEvent{
Namespace: event.LinkEventsNs,
EventType: eventType,
Timestamp: time.Now(),
LinkId: linkId,
Namespace: event.LinkEventsNs,
EventType: eventType,
EventSrcId: network.GetAppId(),
Timestamp: time.Now(),
LinkId: linkId,
}
network.eventDispatcher.AcceptLinkEvent(linkEvent)
}
Expand Down
Loading

0 comments on commit 71c7f6b

Please sign in to comment.