Skip to content

Commit

Permalink
Telemetry: Deduplicate log entries
Browse files Browse the repository at this point in the history
  • Loading branch information
marcotc committed Nov 26, 2024
1 parent 96e557a commit 7a47b59
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 56 deletions.
71 changes: 57 additions & 14 deletions lib/datadog/core/telemetry/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ def type
def payload
{}
end

# Override equality to allow for deduplication
# The basic implementation is to check if the other object is an instance of the same class.
# This works for events that have no attributes.
# For events with attributes, you should override this method to compare the attributes.
def ==(other)
other.is_a?(self.class)
end

# @see #==
alias eql? ==

# @see #==
def hash
self.class.hash
end
end

# Telemetry class for the 'app-started' event
Expand Down Expand Up @@ -263,6 +279,8 @@ def patch_error(integration)

# Telemetry class for the 'app-client-configuration-change' event
class AppClientConfigurationChange < Base
attr_reader :changes, :origin

def type
'app-client-configuration-change'
end
Expand Down Expand Up @@ -301,6 +319,16 @@ def configuration

res
end

def ==(other)
other.is_a?(AppClientConfigurationChange) && other.changes == @changes && other.origin == @origin
end

alias eql? ==

def hash
[self.class, @changes, @origin].hash
end
end

# Telemetry class for the 'app-heartbeat' event
Expand All @@ -315,20 +343,12 @@ class AppClosing < Base
def type
'app-closing'
end

def ==(other)
other.is_a?(AppClosing)
end

alias eql? ==

def hash
self.class.hash
end
end

# Telemetry class for the 'generate-metrics' event
class GenerateMetrics < Base
attr_reader :namespace, :metric_series

def type
'generate-metrics'
end
Expand All @@ -345,9 +365,20 @@ def payload
series: @metric_series.map(&:to_h)
}
end

def ==(other)
other.is_a?(GenerateMetrics) && other.namespace == @namespace && other.metric_series == @metric_series
end

alias eql? ==

def hash
[self.class, @namespace, @metric_series].hash
end
end

# Telemetry class for the 'logs' event
# Telemetry class for the 'logs' event.
# Logs with the same contend are deduplicated at flush time.
class Log < Base
LEVELS = {
error: 'ERROR',
Expand All @@ -367,7 +398,7 @@ def initialize(message:, level:, stack_trace: nil, count: 1)
@message = message
@stack_trace = stack_trace

if LEVELS_STRING.include?(level)
if level.is_a?(String) && LEVELS_STRING.include?(level)
@level = level
elsif level.is_a?(Symbol)
@level = LEVELS.fetch(level) { |k| raise ArgumentError, "Invalid log level :#{k}" }
Expand All @@ -393,13 +424,15 @@ def payload

# override equality to allow for deduplication
def ==(other)
other.is_a?(Log) && other.message == @message && other.level == @level && other.stack_trace == @stack_trace && other.count == @count
other.is_a?(Log) &&
other.message == @message &&
other.level == @level && other.stack_trace == @stack_trace && other.count == @count
end

alias eql? ==

def hash
[@message, @level, @stack_trace, @count].hash
[self.class, @message, @level, @stack_trace, @count].hash
end
end

Expand Down Expand Up @@ -430,6 +463,16 @@ def payload
}
end
end

def ==(other)
other.is_a?(MessageBatch) && other.events == @events
end

alias eql? ==

def hash
[self.class, @events].hash
end
end
end
end
Expand Down
22 changes: 22 additions & 0 deletions lib/datadog/core/telemetry/metric.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ def to_h
}
end

def ==(other)
other.is_a?(self.class) &&
name == other.name &&
values == other.values && tags == other.tags && common == other.common && type == other.type
end

alias eql? ==

def hash
[self.class, name, values, tags, common, type].hash
end

private

def tags_to_array(tags)
Expand Down Expand Up @@ -71,6 +83,16 @@ def to_h
res[:interval] = interval
res
end

def ==(other)
super && interval == other.interval
end

alias eql? ==

def hash
[super, interval].hash
end
end

# Count metric adds up all the submitted values in a time interval. This would be suitable for a
Expand Down
18 changes: 10 additions & 8 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,33 +170,35 @@ def disable_on_not_found!(response)
disable!
end

private

# Deduplicate all_logs by counting the number of occurrences of each log
# entry and replacing them with a single entry with the count.
# Other events are passed through unchanged.
def deduplicate_logs(events)
return if events.empty?
return events if events.empty?

logs = []
all_logs = []
other_events = events.reject do |event|
if event.is_a?(Event::Log)
logs << event
all_logs << event
true
else
false
end
end

return if logs.empty?
return events if all_logs.empty?

logs = logs.group_by(&:hash).map do |_, logs|
uniq_logs = all_logs.group_by(&:itself).map do |_, logs|
log = logs.first
if logs.size > 1
# New log event with a count of repeated occurrences
Event::Log.new(message: log.message, level: log.level, stack_trace: log.stack_trace, count: logs.size)
else
log
end
end

other_events + logs
other_events + uniq_logs
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/core/telemetry/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Datadog

def disable!: () -> void

def client_configuration_change!: (Enumerable[[String, Numeric | bool | String]] changes) -> void
def client_configuration_change!: (Array[[String, Numeric | bool | String]] changes) -> void

def emit_closing!: () -> void

Expand Down
26 changes: 14 additions & 12 deletions sig/datadog/core/telemetry/event.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ module Datadog
end

class AppClientConfigurationChange < Base
@changes: Enumerable[[String, Numeric | bool | String | int]]
@origin: String
attr_reader changes: Array[[String, Numeric | bool | String | int]]
attr_reader origin: String

def initialize: (Enumerable[[String, Numeric | bool | String]] changes, String origin) -> void
def initialize: (Array[[String, Numeric | bool | String]] changes, String origin) -> void

def configuration: () -> Array[Hash[Symbol, untyped]]
end
Expand All @@ -61,23 +61,25 @@ module Datadog
end

class GenerateMetrics < Base
@namespace: String
@metric_series: Enumerable[Datadog::Core::Telemetry::Metric::Base]
attr_reader namespace: String
attr_reader metric_series: Array[Metric::Base]

def initialize: (String namespace, Enumerable[Datadog::Core::Telemetry::Metric::Base] metric_series) -> void
def initialize: (String namespace, Array[Metric::Base] metric_series) -> void
end

class Log < Base
LEVELS: Hash[Symbol, String]

@count: Integer
@message: String
@level: "ERROR" | "DEBUG" | "WARN"
@stack_trace: String?
LEVELS_STRING: Array[String]

def initialize: (message: String, level: Symbol, ?stack_trace: String?) -> void
attr_reader count: Integer
attr_reader message: String
attr_reader level: String
attr_reader stack_trace: String?

def payload: () -> { logs: [Hash[Symbol, String]] }
def initialize: (message: String, level: (Symbol|String), ?stack_trace: String?, ?count: Integer) -> void

def payload: () -> { logs: [Hash[Symbol, (String|Integer)]] }
end

class Distributions < GenerateMetrics
Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/core/telemetry/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ module Datadog

private

def deduplicate_logs: (Array[Event::Base] events) -> Array[Event::Base]

def heartbeat!: () -> void

def started!: () -> void
Expand Down
Loading

0 comments on commit 7a47b59

Please sign in to comment.