Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry: Deduplicate log entries #4154

Open
wants to merge 4 commits into
base: disable-env-logs
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 81 additions & 3 deletions lib/datadog/core/telemetry/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ def type
def payload
{}
end

# Override equality to allow for deduplication
# The basic implementation is to check if the other object is an instance of the same class.
# This works for events that have no attributes.
# For events with attributes, you should override this method to compare the attributes.
def ==(other)
other.is_a?(self.class)
end

# @see #==
alias eql? ==

# @see #==
def hash
self.class.hash
end
end

# Telemetry class for the 'app-started' event
Expand Down Expand Up @@ -263,6 +279,8 @@ def patch_error(integration)

# Telemetry class for the 'app-client-configuration-change' event
class AppClientConfigurationChange < Base
attr_reader :changes, :origin

def type
'app-client-configuration-change'
end
Expand Down Expand Up @@ -301,6 +319,16 @@ def configuration

res
end

def ==(other)
other.is_a?(AppClientConfigurationChange) && other.changes == @changes && other.origin == @origin
end

alias eql? ==

def hash
[self.class, @changes, @origin].hash
end
end

# Telemetry class for the 'app-heartbeat' event
Expand All @@ -319,6 +347,8 @@ def type

# Telemetry class for the 'generate-metrics' event
class GenerateMetrics < Base
attr_reader :namespace, :metric_series

def type
'generate-metrics'
end
Expand All @@ -335,24 +365,48 @@ def payload
series: @metric_series.map(&:to_h)
}
end

def ==(other)
other.is_a?(GenerateMetrics) && other.namespace == @namespace && other.metric_series == @metric_series
end

alias eql? ==

def hash
[self.class, @namespace, @metric_series].hash
end
end

# Telemetry class for the 'logs' event
# Telemetry class for the 'logs' event.
# Logs with the same contend are deduplicated at flush time.
class Log < Base
LEVELS = {
error: 'ERROR',
warn: 'WARN',
}.freeze

LEVELS_STRING = LEVELS.values.freeze

attr_reader :message, :level, :stack_trace, :count

def type
'logs'
end

def initialize(message:, level:, stack_trace: nil)
def initialize(message:, level:, stack_trace: nil, count: 1)
super()
@message = message
@stack_trace = stack_trace
@level = LEVELS.fetch(level) { |k| raise ArgumentError, "Invalid log level :#{k}" }

if level.is_a?(String) && LEVELS_STRING.include?(level)
@level = level
elsif level.is_a?(Symbol)
@level = LEVELS.fetch(level) { |k| raise ArgumentError, "Invalid log level :#{k}" }
else
raise ArgumentError, "Invalid log level #{level}"
end

@count = count
end

def payload
Expand All @@ -362,10 +416,24 @@ def payload
message: @message,
level: @level,
stack_trace: @stack_trace,
count: @count,
}.compact
]
}
end

# override equality to allow for deduplication
def ==(other)
other.is_a?(Log) &&
other.message == @message &&
other.level == @level && other.stack_trace == @stack_trace && other.count == @count
end

alias eql? ==

def hash
[self.class, @message, @level, @stack_trace, @count].hash
end
end

# Telemetry class for the 'distributions' event
Expand Down Expand Up @@ -395,6 +463,16 @@ def payload
}
end
end

def ==(other)
other.is_a?(MessageBatch) && other.events == @events
end

alias eql? ==

def hash
[self.class, @events].hash
end
end
end
end
Expand Down
22 changes: 22 additions & 0 deletions lib/datadog/core/telemetry/metric.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ def to_h
}
end

def ==(other)
other.is_a?(self.class) &&
name == other.name &&
values == other.values && tags == other.tags && common == other.common && type == other.type
end

alias eql? ==

def hash
[self.class, name, values, tags, common, type].hash
end

private

def tags_to_array(tags)
Expand Down Expand Up @@ -71,6 +83,16 @@ def to_h
res[:interval] = interval
res
end

def ==(other)
super && interval == other.interval
end

alias eql? ==

def hash
[super, interval].hash
end
end

# Count metric adds up all the submitted values in a time interval. This would be suitable for a
Expand Down
33 changes: 33 additions & 0 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ def flush_events(events)
return if events.empty?
return if !enabled? || !sent_started_event?

events = deduplicate_logs(events)

Datadog.logger.debug { "Sending #{events&.count} telemetry events" }
send_event(Event::MessageBatch.new(events))
end
Expand Down Expand Up @@ -167,6 +169,37 @@ def disable_on_not_found!(response)
Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.')
disable!
end

# Deduplicate all_logs by counting the number of occurrences of each log
# entry and replacing them with a single entry with the count.
# Other events are passed through unchanged.
def deduplicate_logs(events)
return events if events.empty?

all_logs = []
other_events = events.reject do |event|
if event.is_a?(Event::Log)
all_logs << event
true
else
false
end
end

return events if all_logs.empty?

uniq_logs = all_logs.group_by(&:itself).map do |_, logs|
log = logs.first
if logs.size > 1
# New log event with a count of repeated occurrences
Event::Log.new(message: log.message, level: log.level, stack_trace: log.stack_trace, count: logs.size)
else
log
end
end

other_events + uniq_logs
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/core/telemetry/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Datadog

def disable!: () -> void

def client_configuration_change!: (Enumerable[[String, Numeric | bool | String]] changes) -> void
def client_configuration_change!: (Array[[String, Numeric | bool | String]] changes) -> void

def emit_closing!: () -> void

Expand Down
25 changes: 14 additions & 11 deletions sig/datadog/core/telemetry/event.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ module Datadog
end

class AppClientConfigurationChange < Base
@changes: Enumerable[[String, Numeric | bool | String | int]]
@origin: String
attr_reader changes: Array[[String, Numeric | bool | String | int]]
attr_reader origin: String

def initialize: (Enumerable[[String, Numeric | bool | String]] changes, String origin) -> void
def initialize: (Array[[String, Numeric | bool | String]] changes, String origin) -> void

def configuration: () -> Array[Hash[Symbol, untyped]]
end
Expand All @@ -61,22 +61,25 @@ module Datadog
end

class GenerateMetrics < Base
@namespace: String
@metric_series: Enumerable[Datadog::Core::Telemetry::Metric::Base]
attr_reader namespace: String
attr_reader metric_series: Array[Metric::Base]

def initialize: (String namespace, Enumerable[Datadog::Core::Telemetry::Metric::Base] metric_series) -> void
def initialize: (String namespace, Array[Metric::Base] metric_series) -> void
end

class Log < Base
LEVELS: Hash[Symbol, String]

@message: String
@level: "ERROR" | "DEBUG" | "WARN"
@stack_trace: String?
LEVELS_STRING: Array[String]

def initialize: (message: String, level: Symbol, ?stack_trace: String?) -> void
attr_reader count: Integer
attr_reader message: String
attr_reader level: String
attr_reader stack_trace: String?

def payload: () -> { logs: [Hash[Symbol, String]] }
def initialize: (message: String, level: (Symbol|String), ?stack_trace: String?, ?count: Integer) -> void

def payload: () -> { logs: [Hash[Symbol, (String|Integer)]] }
end

class Distributions < GenerateMetrics
Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/core/telemetry/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ module Datadog

private

def deduplicate_logs: (Array[Event::Base] events) -> Array[Event::Base]

def heartbeat!: () -> void

def started!: () -> void
Expand Down
Loading
Loading