Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ECS Compatibility #12305

Merged
merged 15 commits into from
Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/data/logstash/env2yaml/env2yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func normalizeSetting(setting string) (string, error) {
"pipeline.batch.delay",
"pipeline.unsafe_shutdown",
"pipeline.java_execution",
"pipeline.ecs_compatibility"
"pipeline.plugin_classloaders",
"path.config",
"config.string",
Expand Down
8 changes: 8 additions & 0 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
logger.warn("deprecated setting `config.field_reference.parser` set; field reference parsing is strict by default")
end

if @settings.set?('pipeline.ecs_compatibility')
ecs_compatibility_value = settings.get('pipeline.ecs_compatibility')
if ecs_compatibility_value != 'disabled'
logger.warn("Setting `pipeline.ecs_compatibility` given as `#{ecs_compatibility_value}`; " +
"values other than `disabled` are currently considered BETA and may have unintended consequences when upgrading minor versions of Logstash.")
end
end

# This is for backward compatibility in the tests
if source_loader.nil?
@source_loader = LogStash::Config::SourceLoader.new
Expand Down
4 changes: 3 additions & 1 deletion logstash-core/lib/logstash/codecs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def flush(&block)

public
def clone
return self.class.new(params)
LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, self.class, params).tap do |klone|
klone.metric = @metric if klone.instance_variable_get(:@metric).nil?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 seems new, help me understand why do we now copy the @metric over?

Copy link
Member Author

@yaauie yaauie Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This resolves a bug in which codecs that are cloned undercount their metrics, since only the original is "connected" to the metrics store and the copies all end up with a null implementation. Inputs like TCP and File clone their codecs per connection/file, which is what allows codecs like multiline to safely retain state across invocations.

In reality, most of the metrics that are recorded are handled by the delegator that wraps the codec, so this only really addresses codecs that go out of their way to record additional metrics.

end
end
end; end # class LogStash::Codecs::Base
6 changes: 3 additions & 3 deletions logstash-core/lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,12 @@ def compile_initializer
# If any parent is a Plugin, this must be a codec.

if attributes.elements.nil?
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, {}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
else
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)

attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})" << (plugin_type == "codec" ? "" : "\n")
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
end
end

Expand All @@ -271,7 +271,7 @@ def compile
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))"
end
end

Expand Down
13 changes: 12 additions & 1 deletion logstash-core/lib/logstash/config/mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ def config_init(params)
params[name.to_s] = deep_replace(value)
end

# Intercept codecs that have not been instantiated
params.each do |name, value|
validator = self.class.validator_find(name)
next unless validator && validator[:validate] == :codec && value.kind_of?(String)

codec_klass = LogStash::Plugin.lookup("codec", value)
codec_instance = LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, codec_klass)

params[name.to_s] = LogStash::Codecs::Delegator.new(codec_instance)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and these instantiated on demand previously?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a weird one.

When a pipeline definition included a codec that parsed by the LIR as a plugin definition (e.g., codec => rubydebug {}), the codec was instantiated by the plugin factory in the normal way, but when the LIR parsed a codec as a string (quoted or bareword. e.g., codec => rubydebug), the instantiation was done by the input/output in Config::Mixin::validate_value(value, validator) which as a class method does not have access to the instance's execution context.

It looks like I forgot to include the removal of the string handling there, which should now never happen because we intercept it here.

end

if !self.class.validate(params)
raise LogStash::ConfigurationError,
I18n.t("logstash.runner.configuration.invalid_plugin_settings")
Expand Down Expand Up @@ -190,7 +201,7 @@ def config(name, opts={})
name = name.to_s if name.is_a?(Symbol)
@config[name] = opts # ok if this is empty

if name.is_a?(String)
if name.is_a?(String) && opts.fetch(:attr_accessor, true)
define_method(name) { instance_variable_get("@#{name}") }
define_method("#{name}=") { |v| instance_variable_set("@#{name}", v) }
end
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ module Environment
Setting::Boolean.new("pipeline.plugin_classloaders", false),
Setting::Boolean.new("pipeline.separate_logs", false),
Setting::CoercibleString.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]),
Setting::CoercibleString.new("pipeline.ecs_compatibility", "disabled", true, %w(disabled v1 v2)),
Setting.new("path.plugins", Array, []),
Setting::NullableString.new("interactive", nil, false),
Setting::Boolean.new("config.debug", false),
Expand Down
8 changes: 4 additions & 4 deletions logstash-core/lib/logstash/inputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ def metric=(metric)

def execution_context=(context)
super
# There is no easy way to propage an instance variable into the codec, because the codec
# are created at the class level
# TODO(talevy): Codecs should have their own execution_context, for now they will inherit their
# parent plugin's
# Setting the execution context after initialization is deprecated and will be removed in
# a future release of Logstash. While this code is no longer executed from Logstash core,
# we continue to propagate a set execution context to an input's codec, and rely on super's
# deprecation warning.
@codec.execution_context = context
context
end
Expand Down
8 changes: 4 additions & 4 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ def metric=(metric)

def execution_context=(context)
super
# There is no easy way to propage an instance variable into the codec, because the codec
# are created at the class level
# TODO(talevy): Codecs should have their own execution_context, for now they will inherit their
# parent plugin's
# Setting the execution context after initialization is deprecated and will be removed in
# a future release of Logstash. While this code is no longer executed from Logstash core,
# we continue to propagate a set execution context to an output's codec, and rely on super's
# deprecation warning.
@codec.execution_context = context
context
end
Expand Down
4 changes: 2 additions & 2 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ def non_reloadable_plugins
private


def plugin(plugin_type, name, source, *args)
@plugin_factory.plugin(plugin_type, name, source, *args)
def plugin(plugin_type, name, args, source)
@plugin_factory.plugin(plugin_type, name, args, source)
end

def default_logging_keys(other_keys = {})
Expand Down
16 changes: 14 additions & 2 deletions logstash-core/lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

require "logstash/config/mixin"
require "logstash/plugins/ecs_compatibility_support"
require "concurrent"
require "securerandom"

Expand All @@ -24,11 +25,12 @@
class LogStash::Plugin
include LogStash::Util::Loggable

attr_accessor :params, :execution_context
attr_accessor :params

NL = "\n"

include LogStash::Config::Mixin
include LogStash::Plugins::ECSCompatibilitySupport

# Disable or enable metric logging for this specific plugin instance
# by default we record all the metrics we can, but you can disable metrics collection
Expand Down Expand Up @@ -60,7 +62,7 @@ def eql?(other)
self.class.name == other.class.name && @params == other.params
end

def initialize(params=nil)
def initialize(params={})
@logger = self.logger
@deprecation_logger = self.deprecation_logger
# need to access settings statically because plugins are initialized in config_ast with no context.
Expand Down Expand Up @@ -177,4 +179,14 @@ def self.lookup(type, name)
def plugin_metadata
LogStash::PluginMetadata.for_plugin(self.id)
end

# Deprecated attr_writer for execution_context
def execution_context=(new_context)
@deprecation_logger.deprecated("LogStash::Plugin#execution_context=(new_ctx) is deprecated. Use LogStash::Plugins::Contextualizer#initialize_plugin(new_ctx, klass, args) instead", :caller => caller.first)
@execution_context = new_context
end

def execution_context
@execution_context || LogStash::ExecutionContext::Empty
end
end # class LogStash::Plugin
53 changes: 53 additions & 0 deletions logstash-core/lib/logstash/plugins/ecs_compatibility_support.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
module LogStash
module Plugins
module ECSCompatibilitySupport
def self.included(base)
base.extend(ArgumentValidator)
base.config(:ecs_compatibility, :validate => :ecs_compatibility_argument,
:attr_accessor => false)
end

MUTEX = Mutex.new
private_constant :MUTEX

def ecs_compatibility
@_ecs_compatibility || MUTEX.synchronize do
@_ecs_compatibility ||= begin
# use config_init-set value if present
break @ecs_compatibility unless @ecs_compatibility.nil?

pipeline = execution_context.pipeline
pipeline_settings = pipeline && pipeline.settings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, this taking care of manual plugin instantiation (nil pipeline) e.g. in tests LS::Inputs::Twitter.new params, right?

pipeline_settings ||= LogStash::SETTINGS

if !pipeline_settings.set?('pipeline.ecs_compatibility')
deprecation_logger.deprecated("Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. " +
"To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.")
end

pipeline_settings.get_value('pipeline.ecs_compatibility').to_sym
end
end
end

module ArgumentValidator
V_PREFIXED_INTEGER_PATTERN = %r(\Av[1-9][0-9]?\Z).freeze
private_constant :V_PREFIXED_INTEGER_PATTERN

def validate_value(value, validator)
return super unless validator == :ecs_compatibility_argument

value = deep_replace(value)
value = hash_or_array(value)

if value.size == 1
return true, :disabled if value.first.to_s == 'disabled'
return true, value.first.to_sym if value.first.to_s =~ V_PREFIXED_INTEGER_PATTERN
end

return false, "Expected a v-prefixed integer major-version number (e.g., `v1`) or the literal `disabled`, got #{value.inspect}"
end
end
end
end
end
5 changes: 5 additions & 0 deletions logstash-core/lib/logstash/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ class LogStash::Runner < Clamp::StrictCommand
:attribute_name => "pipeline.unsafe_shutdown",
:default => LogStash::SETTINGS.get_default("pipeline.unsafe_shutdown")

option ["--pipeline.ecs_compatibility"], "STRING",
I18n.t("logstash.runner.flag.ecs_compatibility"),
:attribute_name => "pipeline.ecs_compatibility",
:default => LogStash::SETTINGS.get_default('pipeline.ecs_compatibility')

# Data Path Setting
option ["--path.data"] , "PATH",
I18n.t("logstash.runner.flag.datapath"),
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Settings
"pipeline.system",
"pipeline.workers",
"pipeline.ordered",
"pipeline.ecs_compatibility",
"queue.checkpoint.acks",
"queue.checkpoint.interval",
"queue.checkpoint.writes",
Expand Down
15 changes: 15 additions & 0 deletions logstash-core/locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,21 @@ en:
if there are still inflight events in memory.
By default, logstash will refuse to quit until all
received events have been pushed to the outputs.
ecs_compatibility: |+
Sets the pipeline's default value for `ecs_compatibility`,
a setting that is available to plugins that implement
an ECS Compatibility mode for use with the Elastic Common
Schema.
Possible values are:
- disabled (default)
- v1
- v2
This option allows the early opt-in (or preemptive opt-out)
of ECS Compatibility modes in plugins, which is scheduled to
be on-by-default in a future major release of Logstash.

Values other than `disabled` are currently considered BETA,
and may produce unintended consequences when upgrading Logstash.
rubyshell: |+
Drop to shell instead of running as normal.
Valid shells are "irb" and "pry"
Expand Down
59 changes: 59 additions & 0 deletions logstash-core/spec/logstash/execution_context_factory_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

require "spec_helper"

describe LogStash::Plugins::ExecutionContextFactory do
let(:pipeline) { double('Pipeline') }
let(:agent) { double('Agent') }
let(:inner_dlq_writer) { nil }

subject(:factory) { described_class.new(agent, pipeline, inner_dlq_writer) }

context '#create' do
let(:plugin_id) { SecureRandom.uuid }
let(:plugin_type) { 'input' }

context 'the resulting instance' do
subject(:instance) { factory.create(plugin_id, plugin_type) }

it 'retains the pipeline from the factory' do
expect(instance.pipeline).to be(pipeline)
end

it 'retains the agent from the factory' do
expect(instance.agent).to be(agent)
end

it 'has a dlq_writer' do
expect(instance.dlq_writer).to_not be_nil
end

context 'dlq_writer' do
subject(:instance_dlq_writer) { instance.dlq_writer }

it 'retains the plugin id' do
expect(instance_dlq_writer.plugin_id).to eq(plugin_id)
end

it 'retains the plugin type' do
expect(instance_dlq_writer.plugin_type).to eq(plugin_type)
end
end
end
end
end
8 changes: 3 additions & 5 deletions logstash-core/spec/logstash/execution_context_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
allow(pipeline).to receive(:pipeline_id).and_return(pipeline_id)
end

subject { described_class.new(pipeline, agent, plugin_id, plugin_type, dlq_writer) }
subject { described_class.new(pipeline, agent, dlq_writer) }

it "returns the `pipeline_id`" do
expect(subject.pipeline_id).to eq(pipeline_id)
Expand All @@ -44,9 +44,7 @@
expect(subject.agent).to eq(agent)
end

it "returns the plugin-specific dlq writer" do
expect(subject.dlq_writer.plugin_type).to eq(plugin_type)
expect(subject.dlq_writer.plugin_id).to eq(plugin_id)
expect(subject.dlq_writer.inner_writer).to eq(dlq_writer)
it "returns the dlq writer" do
expect(subject.dlq_writer).to be(dlq_writer)
end
end
Loading