-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ECS Compatibility #12305
ECS Compatibility #12305
Changes from 14 commits
9947f4b
2694a40
52721ce
ffe82db
6988c70
8ad8c8f
d42818d
58b6099
9d129ce
bf36a09
f91733a
453e019
87bf1dc
9a15aa1
3fc312d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,6 +99,17 @@ def config_init(params) | |
params[name.to_s] = deep_replace(value) | ||
end | ||
|
||
# Intercept codecs that have not been instantiated | ||
params.each do |name, value| | ||
validator = self.class.validator_find(name) | ||
next unless validator && validator[:validate] == :codec && value.kind_of?(String) | ||
|
||
codec_klass = LogStash::Plugin.lookup("codec", value) | ||
codec_instance = LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, codec_klass) | ||
|
||
params[name.to_s] = LogStash::Codecs::Delegator.new(codec_instance) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and these instantiated on demand previously? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a weird one. When a pipeline definition included a codec that parsed by the LIR as a plugin definition (e.g., It looks like I forgot to include the removal of the string handling there, which should now never happen because we intercept it here. |
||
end | ||
|
||
if !self.class.validate(params) | ||
raise LogStash::ConfigurationError, | ||
I18n.t("logstash.runner.configuration.invalid_plugin_settings") | ||
|
@@ -190,7 +201,7 @@ def config(name, opts={}) | |
name = name.to_s if name.is_a?(Symbol) | ||
@config[name] = opts # ok if this is empty | ||
|
||
if name.is_a?(String) | ||
if name.is_a?(String) && opts.fetch(:attr_accessor, true) | ||
define_method(name) { instance_variable_get("@#{name}") } | ||
define_method("#{name}=") { |v| instance_variable_set("@#{name}", v) } | ||
end | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
module LogStash | ||
module Plugins | ||
module ECSCompatibilitySupport | ||
def self.included(base) | ||
base.extend(ArgumentValidator) | ||
base.config(:ecs_compatibility, :validate => :ecs_compatibility_argument, | ||
:attr_accessor => false) | ||
end | ||
|
||
MUTEX = Mutex.new | ||
private_constant :MUTEX | ||
|
||
def ecs_compatibility | ||
@_ecs_compatibility || MUTEX.synchronize do | ||
@_ecs_compatibility ||= begin | ||
# use config_init-set value if present | ||
break @ecs_compatibility unless @ecs_compatibility.nil? | ||
|
||
pipeline = execution_context.pipeline | ||
pipeline_settings = pipeline && pipeline.settings | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so, this taking care of manual plugin instantiation ( |
||
pipeline_settings ||= LogStash::SETTINGS | ||
|
||
if !pipeline_settings.set?('pipeline.ecs_compatibility') | ||
deprecation_logger.deprecated("Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. " + | ||
"To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.") | ||
end | ||
|
||
pipeline_settings.get_value('pipeline.ecs_compatibility').to_sym | ||
end | ||
end | ||
end | ||
|
||
module ArgumentValidator | ||
V_PREFIXED_INTEGER_PATTERN = %r(\Av[1-9][0-9]?\Z).freeze | ||
private_constant :V_PREFIXED_INTEGER_PATTERN | ||
|
||
def validate_value(value, validator) | ||
return super unless validator == :ecs_compatibility_argument | ||
|
||
value = deep_replace(value) | ||
value = hash_or_array(value) | ||
|
||
if value.size == 1 | ||
return true, :disabled if value.first.to_s == 'disabled' | ||
return true, value.first.to_sym if value.first.to_s =~ V_PREFIXED_INTEGER_PATTERN | ||
end | ||
|
||
return false, "Expected a v-prefixed integer major-version number (e.g., `v1`) or the literal `disabled`, got #{value.inspect}" | ||
end | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
# Licensed to Elasticsearch B.V. under one or more contributor | ||
# license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright | ||
# ownership. Elasticsearch B.V. licenses this file to you under | ||
# the Apache License, Version 2.0 (the "License"); you may | ||
# not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
require "spec_helper" | ||
|
||
describe LogStash::Plugins::ExecutionContextFactory do | ||
let(:pipeline) { double('Pipeline') } | ||
let(:agent) { double('Agent') } | ||
let(:inner_dlq_writer) { nil } | ||
|
||
subject(:factory) { described_class.new(agent, pipeline, inner_dlq_writer) } | ||
|
||
context '#create' do | ||
let(:plugin_id) { SecureRandom.uuid } | ||
let(:plugin_type) { 'input' } | ||
|
||
context 'the resulting instance' do | ||
subject(:instance) { factory.create(plugin_id, plugin_type) } | ||
|
||
it 'retains the pipeline from the factory' do | ||
expect(instance.pipeline).to be(pipeline) | ||
end | ||
|
||
it 'retains the agent from the factory' do | ||
expect(instance.agent).to be(agent) | ||
end | ||
|
||
it 'has a dlq_writer' do | ||
expect(instance.dlq_writer).to_not be_nil | ||
end | ||
|
||
context 'dlq_writer' do | ||
subject(:instance_dlq_writer) { instance.dlq_writer } | ||
|
||
it 'retains the plugin id' do | ||
expect(instance_dlq_writer.plugin_id).to eq(plugin_id) | ||
end | ||
|
||
it 'retains the plugin type' do | ||
expect(instance_dlq_writer.plugin_type).to eq(plugin_type) | ||
end | ||
end | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 seems new, help me understand why do we now copy the
@metric
over?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This resolves a bug in which codecs that are cloned undercount their metrics, since only the original is "connected" to the metrics store and the copies all end up with a null implementation. Inputs like TCP and File clone their codecs per connection/file, which is what allows codecs like
multiline
to safely retain state across invocations.In reality, most of the metrics that are recorded are handled by the delegator that wraps the codec, so this only really addresses codecs that go out of their way to record additional metrics.