diff --git a/docker/data/logstash/env2yaml/env2yaml.go b/docker/data/logstash/env2yaml/env2yaml.go index 82483aae853..860d4da8fc4 100644 --- a/docker/data/logstash/env2yaml/env2yaml.go +++ b/docker/data/logstash/env2yaml/env2yaml.go @@ -57,6 +57,7 @@ func normalizeSetting(setting string) (string, error) { "pipeline.batch.delay", "pipeline.unsafe_shutdown", "pipeline.java_execution", + "pipeline.ecs_compatibility" "pipeline.plugin_classloaders", "path.config", "config.string", diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index f05a3b9f7f6..d1f4743f999 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -75,6 +75,14 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil) logger.warn("deprecated setting `config.field_reference.parser` set; field reference parsing is strict by default") end + if @settings.set?('pipeline.ecs_compatibility') + ecs_compatibility_value = settings.get('pipeline.ecs_compatibility') + if ecs_compatibility_value != 'disabled' + logger.warn("Setting `pipeline.ecs_compatibility` given as `#{ecs_compatibility_value}`; " + + "values other than `disabled` are currently considered BETA and may have unintended consequences when upgrading minor versions of Logstash.") + end + end + # This is for backward compatibility in the tests if source_loader.nil? @source_loader = LogStash::Config::SourceLoader.new diff --git a/logstash-core/lib/logstash/codecs/base.rb b/logstash-core/lib/logstash/codecs/base.rb index db4cde703ce..081a3b376e3 100644 --- a/logstash-core/lib/logstash/codecs/base.rb +++ b/logstash-core/lib/logstash/codecs/base.rb @@ -92,6 +92,8 @@ def flush(&block) public def clone - return self.class.new(params) + LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, self.class, params).tap do |klone| + klone.metric = @metric if klone.instance_variable_get(:@metric).nil? + end end end; end # class LogStash::Codecs::Base diff --git a/logstash-core/lib/logstash/config/config_ast.rb b/logstash-core/lib/logstash/config/config_ast.rb index e3d413bf5e4..c7c5f9cfee0 100644 --- a/logstash-core/lib/logstash/config/config_ast.rb +++ b/logstash-core/lib/logstash/config/config_ast.rb @@ -249,12 +249,12 @@ def compile_initializer # If any parent is a Plugin, this must be a codec. if attributes.elements.nil? - return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n") + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, {}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n") else settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?) attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})" - return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})" << (plugin_type == "codec" ? "" : "\n") + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n") end end @@ -271,7 +271,7 @@ def compile when "codec" settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?) attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})" - return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})" + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))" end end diff --git a/logstash-core/lib/logstash/config/mixin.rb b/logstash-core/lib/logstash/config/mixin.rb index a8febea0b0e..38ef99b9d8b 100644 --- a/logstash-core/lib/logstash/config/mixin.rb +++ b/logstash-core/lib/logstash/config/mixin.rb @@ -49,6 +49,7 @@ module LogStash::Config::Mixin include LogStash::Util::SubstitutionVariables + include LogStash::Util::Loggable attr_accessor :config attr_accessor :original_params @@ -99,6 +100,17 @@ def config_init(params) params[name.to_s] = deep_replace(value) end + # Intercept codecs that have not been instantiated + params.each do |name, value| + validator = self.class.validator_find(name) + next unless validator && validator[:validate] == :codec && value.kind_of?(String) + + codec_klass = LogStash::Plugin.lookup("codec", value) + codec_instance = LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, codec_klass) + + params[name.to_s] = LogStash::Codecs::Delegator.new(codec_instance) + end + if !self.class.validate(params) raise LogStash::ConfigurationError, I18n.t("logstash.runner.configuration.invalid_plugin_settings") @@ -190,7 +202,7 @@ def config(name, opts={}) name = name.to_s if name.is_a?(Symbol) @config[name] = opts # ok if this is empty - if name.is_a?(String) + if name.is_a?(String) && opts.fetch(:attr_accessor, true) define_method(name) { instance_variable_get("@#{name}") } define_method("#{name}=") { |v| instance_variable_set("@#{name}", v) } end @@ -429,6 +441,11 @@ def validate_value(value, validator) case validator when :codec if value.first.is_a?(String) + # A plugin's codecs should be instantiated by `PluginFactory` or in `Config::Mixin#config_init(Hash)`, + # which ensure the inner plugin has access to the outer's execution context and metric store. + # This deprecation exists to warn plugins that call `Config::Mixin::validate_value` directly. + self.deprecation_logger.deprecated("Codec instantiated by `Config::Mixin::DSL::validate_value(String, :codec)` which cannot propagate parent plugin's execution context or metrics. ", + self.logger.debug? ? {:backtrace => caller} : {}) value = LogStash::Codecs::Delegator.new LogStash::Plugin.lookup("codec", value.first).new return true, value else diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 89d5ad1c86d..72e1f6d9599 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -62,6 +62,7 @@ module Environment Setting::Boolean.new("pipeline.plugin_classloaders", false), Setting::Boolean.new("pipeline.separate_logs", false), Setting::CoercibleString.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]), + Setting::CoercibleString.new("pipeline.ecs_compatibility", "disabled", true, %w(disabled v1 v2)), Setting.new("path.plugins", Array, []), Setting::NullableString.new("interactive", nil, false), Setting::Boolean.new("config.debug", false), diff --git a/logstash-core/lib/logstash/inputs/base.rb b/logstash-core/lib/logstash/inputs/base.rb index a01b011e3f8..3ea40a339b9 100644 --- a/logstash-core/lib/logstash/inputs/base.rb +++ b/logstash-core/lib/logstash/inputs/base.rb @@ -124,10 +124,10 @@ def metric=(metric) def execution_context=(context) super - # There is no easy way to propage an instance variable into the codec, because the codec - # are created at the class level - # TODO(talevy): Codecs should have their own execution_context, for now they will inherit their - # parent plugin's + # Setting the execution context after initialization is deprecated and will be removed in + # a future release of Logstash. While this code is no longer executed from Logstash core, + # we continue to propagate a set execution context to an input's codec, and rely on super's + # deprecation warning. @codec.execution_context = context context end diff --git a/logstash-core/lib/logstash/outputs/base.rb b/logstash-core/lib/logstash/outputs/base.rb index c52dad18f9b..ed3a2827224 100644 --- a/logstash-core/lib/logstash/outputs/base.rb +++ b/logstash-core/lib/logstash/outputs/base.rb @@ -127,10 +127,10 @@ def metric=(metric) def execution_context=(context) super - # There is no easy way to propage an instance variable into the codec, because the codec - # are created at the class level - # TODO(talevy): Codecs should have their own execution_context, for now they will inherit their - # parent plugin's + # Setting the execution context after initialization is deprecated and will be removed in + # a future release of Logstash. While this code is no longer executed from Logstash core, + # we continue to propagate a set execution context to an output's codec, and rely on super's + # deprecation warning. @codec.execution_context = context context end diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 54089d5180a..6b3ec1733eb 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -89,8 +89,8 @@ def non_reloadable_plugins private - def plugin(plugin_type, name, source, *args) - @plugin_factory.plugin(plugin_type, name, source, *args) + def plugin(plugin_type, name, args, source) + @plugin_factory.plugin(plugin_type, name, args, source) end def default_logging_keys(other_keys = {}) diff --git a/logstash-core/lib/logstash/plugin.rb b/logstash-core/lib/logstash/plugin.rb index b376bd7d71f..a21e183929c 100644 --- a/logstash-core/lib/logstash/plugin.rb +++ b/logstash-core/lib/logstash/plugin.rb @@ -16,6 +16,7 @@ # under the License. require "logstash/config/mixin" +require "logstash/plugins/ecs_compatibility_support" require "concurrent" require "securerandom" @@ -24,11 +25,12 @@ class LogStash::Plugin include LogStash::Util::Loggable - attr_accessor :params, :execution_context + attr_accessor :params NL = "\n" include LogStash::Config::Mixin + include LogStash::Plugins::ECSCompatibilitySupport # Disable or enable metric logging for this specific plugin instance # by default we record all the metrics we can, but you can disable metrics collection @@ -60,7 +62,7 @@ def eql?(other) self.class.name == other.class.name && @params == other.params end - def initialize(params=nil) + def initialize(params={}) @logger = self.logger @deprecation_logger = self.deprecation_logger # need to access settings statically because plugins are initialized in config_ast with no context. @@ -177,4 +179,14 @@ def self.lookup(type, name) def plugin_metadata LogStash::PluginMetadata.for_plugin(self.id) end + + # Deprecated attr_writer for execution_context + def execution_context=(new_context) + @deprecation_logger.deprecated("LogStash::Plugin#execution_context=(new_ctx) is deprecated. Use LogStash::Plugins::Contextualizer#initialize_plugin(new_ctx, klass, args) instead", :caller => caller.first) + @execution_context = new_context + end + + def execution_context + @execution_context || LogStash::ExecutionContext::Empty + end end # class LogStash::Plugin diff --git a/logstash-core/lib/logstash/plugins/ecs_compatibility_support.rb b/logstash-core/lib/logstash/plugins/ecs_compatibility_support.rb new file mode 100644 index 00000000000..f87aff81c67 --- /dev/null +++ b/logstash-core/lib/logstash/plugins/ecs_compatibility_support.rb @@ -0,0 +1,53 @@ +module LogStash + module Plugins + module ECSCompatibilitySupport + def self.included(base) + base.extend(ArgumentValidator) + base.config(:ecs_compatibility, :validate => :ecs_compatibility_argument, + :attr_accessor => false) + end + + MUTEX = Mutex.new + private_constant :MUTEX + + def ecs_compatibility + @_ecs_compatibility || MUTEX.synchronize do + @_ecs_compatibility ||= begin + # use config_init-set value if present + break @ecs_compatibility unless @ecs_compatibility.nil? + + pipeline = execution_context.pipeline + pipeline_settings = pipeline && pipeline.settings + pipeline_settings ||= LogStash::SETTINGS + + if !pipeline_settings.set?('pipeline.ecs_compatibility') + deprecation_logger.deprecated("Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. " + + "To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.") + end + + pipeline_settings.get_value('pipeline.ecs_compatibility').to_sym + end + end + end + + module ArgumentValidator + V_PREFIXED_INTEGER_PATTERN = %r(\Av[1-9][0-9]?\Z).freeze + private_constant :V_PREFIXED_INTEGER_PATTERN + + def validate_value(value, validator) + return super unless validator == :ecs_compatibility_argument + + value = deep_replace(value) + value = hash_or_array(value) + + if value.size == 1 + return true, :disabled if value.first.to_s == 'disabled' + return true, value.first.to_sym if value.first.to_s =~ V_PREFIXED_INTEGER_PATTERN + end + + return false, "Expected a v-prefixed integer major-version number (e.g., `v1`) or the literal `disabled`, got #{value.inspect}" + end + end + end + end +end diff --git a/logstash-core/lib/logstash/runner.rb b/logstash-core/lib/logstash/runner.rb index 0b5304e5350..9a0cf1bcf56 100644 --- a/logstash-core/lib/logstash/runner.rb +++ b/logstash-core/lib/logstash/runner.rb @@ -154,6 +154,11 @@ class LogStash::Runner < Clamp::StrictCommand :attribute_name => "pipeline.unsafe_shutdown", :default => LogStash::SETTINGS.get_default("pipeline.unsafe_shutdown") + option ["--pipeline.ecs_compatibility"], "STRING", + I18n.t("logstash.runner.flag.ecs_compatibility"), + :attribute_name => "pipeline.ecs_compatibility", + :default => LogStash::SETTINGS.get_default('pipeline.ecs_compatibility') + # Data Path Setting option ["--path.data"] , "PATH", I18n.t("logstash.runner.flag.datapath"), diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb index b65e1bb8946..2d1b509a2e1 100644 --- a/logstash-core/lib/logstash/settings.rb +++ b/logstash-core/lib/logstash/settings.rb @@ -48,6 +48,7 @@ class Settings "pipeline.system", "pipeline.workers", "pipeline.ordered", + "pipeline.ecs_compatibility", "queue.checkpoint.acks", "queue.checkpoint.interval", "queue.checkpoint.writes", diff --git a/logstash-core/locales/en.yml b/logstash-core/locales/en.yml index 25a8304e2b7..8890d9ecd0f 100644 --- a/logstash-core/locales/en.yml +++ b/logstash-core/locales/en.yml @@ -362,6 +362,21 @@ en: if there are still inflight events in memory. By default, logstash will refuse to quit until all received events have been pushed to the outputs. + ecs_compatibility: |+ + Sets the pipeline's default value for `ecs_compatibility`, + a setting that is available to plugins that implement + an ECS Compatibility mode for use with the Elastic Common + Schema. + Possible values are: + - disabled (default) + - v1 + - v2 + This option allows the early opt-in (or preemptive opt-out) + of ECS Compatibility modes in plugins, which is scheduled to + be on-by-default in a future major release of Logstash. + + Values other than `disabled` are currently considered BETA, + and may produce unintended consequences when upgrading Logstash. rubyshell: |+ Drop to shell instead of running as normal. Valid shells are "irb" and "pry" diff --git a/logstash-core/spec/logstash/config/mixin_spec.rb b/logstash-core/spec/logstash/config/mixin_spec.rb index 3fa8f09659e..60c2611363e 100644 --- a/logstash-core/spec/logstash/config/mixin_spec.rb +++ b/logstash-core/spec/logstash/config/mixin_spec.rb @@ -47,6 +47,31 @@ end end + context 'DSL::validate_value(String, :codec)' do + subject(:plugin_class) { Class.new(LogStash::Filters::Base) { config_name "test_deprecated_two" } } + let(:codec_class) { Class.new(LogStash::Codecs::Base) { config_name 'dummy' } } + let(:deprecation_logger) { double("DeprecationLogger").as_null_object } + + before(:each) do + allow(plugin_class).to receive(:deprecation_logger).and_return(deprecation_logger) + allow(LogStash::Plugin).to receive(:lookup).with("codec", codec_class.config_name).and_return(codec_class) + end + + it 'instantiates the codec' do + success, codec = plugin_class.validate_value(codec_class.config_name, :codec) + + expect(success).to be true + expect(codec.class).to eq(codec_class) + end + + it 'logs a deprecation' do + plugin_class.validate_value(codec_class.config_name, :codec) + expect(deprecation_logger).to have_received(:deprecated) do |message| + expect(message).to include("validate_value(String, :codec)") + end + end + end + context "when validating :bytes successfully" do subject do local_num_bytes = num_bytes # needs to be locally scoped :( diff --git a/logstash-core/spec/logstash/execution_context_factory_spec.rb b/logstash-core/spec/logstash/execution_context_factory_spec.rb new file mode 100644 index 00000000000..e20fd6cb68b --- /dev/null +++ b/logstash-core/spec/logstash/execution_context_factory_spec.rb @@ -0,0 +1,59 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "spec_helper" + +describe LogStash::Plugins::ExecutionContextFactory do + let(:pipeline) { double('Pipeline') } + let(:agent) { double('Agent') } + let(:inner_dlq_writer) { nil } + + subject(:factory) { described_class.new(agent, pipeline, inner_dlq_writer) } + + context '#create' do + let(:plugin_id) { SecureRandom.uuid } + let(:plugin_type) { 'input' } + + context 'the resulting instance' do + subject(:instance) { factory.create(plugin_id, plugin_type) } + + it 'retains the pipeline from the factory' do + expect(instance.pipeline).to be(pipeline) + end + + it 'retains the agent from the factory' do + expect(instance.agent).to be(agent) + end + + it 'has a dlq_writer' do + expect(instance.dlq_writer).to_not be_nil + end + + context 'dlq_writer' do + subject(:instance_dlq_writer) { instance.dlq_writer } + + it 'retains the plugin id' do + expect(instance_dlq_writer.plugin_id).to eq(plugin_id) + end + + it 'retains the plugin type' do + expect(instance_dlq_writer.plugin_type).to eq(plugin_type) + end + end + end + end +end diff --git a/logstash-core/spec/logstash/execution_context_spec.rb b/logstash-core/spec/logstash/execution_context_spec.rb index d1e529cbf92..4ec9acb21a3 100644 --- a/logstash-core/spec/logstash/execution_context_spec.rb +++ b/logstash-core/spec/logstash/execution_context_spec.rb @@ -30,7 +30,7 @@ allow(pipeline).to receive(:pipeline_id).and_return(pipeline_id) end - subject { described_class.new(pipeline, agent, plugin_id, plugin_type, dlq_writer) } + subject { described_class.new(pipeline, agent, dlq_writer) } it "returns the `pipeline_id`" do expect(subject.pipeline_id).to eq(pipeline_id) @@ -44,9 +44,7 @@ expect(subject.agent).to eq(agent) end - it "returns the plugin-specific dlq writer" do - expect(subject.dlq_writer.plugin_type).to eq(plugin_type) - expect(subject.dlq_writer.plugin_id).to eq(plugin_id) - expect(subject.dlq_writer.inner_writer).to eq(dlq_writer) + it "returns the dlq writer" do + expect(subject.dlq_writer).to be(dlq_writer) end end diff --git a/logstash-core/spec/logstash/inputs/base_spec.rb b/logstash-core/spec/logstash/inputs/base_spec.rb index 9540bbe384e..ecb34c0d164 100644 --- a/logstash-core/spec/logstash/inputs/base_spec.rb +++ b/logstash-core/spec/logstash/inputs/base_spec.rb @@ -86,18 +86,34 @@ def register; end subject(:instance) { klass.new({}) } - it "allow to set the context" do - expect(instance.execution_context).to be_nil - instance.execution_context = execution_context - - expect(instance.execution_context).to eq(execution_context) - end - - it "propagate the context to the codec" do - expect(instance.codec.execution_context).to be_nil - instance.execution_context = execution_context - - expect(instance.codec.execution_context).to eq(execution_context) + context 'execution_context=' do + let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object } + before(:each) do + allow(klass).to receive(:deprecation_logger).and_return(deprecation_logger_stub) + end + + it "allow to set the context" do + new_ctx = execution_context.dup + subject.execution_context = new_ctx + expect(subject.execution_context).to be(new_ctx) + end + + it "propagate the context to the codec" do + new_ctx = execution_context.dup + expect(instance.codec.execution_context).to_not be(new_ctx) + instance.execution_context = new_ctx + + expect(instance.execution_context).to be(new_ctx) + expect(instance.codec.execution_context).to be(new_ctx) + end + + it 'emits a deprecation warning' do + expect(deprecation_logger_stub).to receive(:deprecated) do |message| + expect(message).to match(/execution_context=/) + end + + instance.execution_context = execution_context + end end end diff --git a/logstash-core/spec/logstash/outputs/base_spec.rb b/logstash-core/spec/logstash/outputs/base_spec.rb index 191d8275178..0e7df6d521f 100644 --- a/logstash-core/spec/logstash/outputs/base_spec.rb +++ b/logstash-core/spec/logstash/outputs/base_spec.rb @@ -102,18 +102,34 @@ def multi_receive_encoded(events_and_encoded) subject(:instance) { klass.new(params.dup) } - it "allow to set the context" do - expect(instance.execution_context).to be_nil - instance.execution_context = execution_context + context 'execution_context=' do + let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object } + before(:each) do + allow(klass).to receive(:deprecation_logger).and_return(deprecation_logger_stub) + end - expect(instance.execution_context).to eq(execution_context) - end + it "allow to set the context" do + new_ctx = execution_context.dup + subject.execution_context = new_ctx + expect(subject.execution_context).to be(new_ctx) + end - it "propagate the context to the codec" do - expect(instance.codec.execution_context).to be_nil - instance.execution_context = execution_context + it "propagate the context to the codec" do + new_ctx = execution_context.dup + expect(instance.codec.execution_context).to_not be(new_ctx) + instance.execution_context = new_ctx - expect(instance.codec.execution_context).to eq(execution_context) + expect(instance.execution_context).to be(new_ctx) + expect(instance.codec.execution_context).to be(new_ctx) + end + + it 'emits a deprecation warning' do + expect(deprecation_logger_stub).to receive(:deprecated) do |message| + expect(message).to match(/execution_context=/) + end + + instance.execution_context = execution_context + end end end diff --git a/logstash-core/spec/logstash/plugin_spec.rb b/logstash-core/spec/logstash/plugin_spec.rb index e56299dd582..e286e2950b6 100644 --- a/logstash-core/spec/logstash/plugin_spec.rb +++ b/logstash-core/spec/logstash/plugin_spec.rb @@ -69,13 +69,28 @@ def self.reloadable? end context "#execution_context" do - subject { Class.new(LogStash::Plugin).new({}) } + let(:klass) { Class.new(LogStash::Plugin) } + subject(:instance) { klass.new({}) } include_context "execution_context" - it "can be set and get" do - expect(subject.execution_context).to be_nil - subject.execution_context = execution_context - expect(subject.execution_context).to eq(execution_context) + context 'execution_context=' do + let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object } + before(:each) do + allow(klass).to receive(:deprecation_logger).and_return(deprecation_logger_stub) + end + + it "can be set and get" do + new_ctx = execution_context.dup + subject.execution_context = new_ctx + expect(subject.execution_context).to eq(new_ctx) + end + + it 'emits a deprecation warning' do + expect(deprecation_logger_stub).to receive(:deprecated) do |message| + expect(message).to match(/execution_context=/) + end + instance.execution_context = execution_context + end end end @@ -402,6 +417,66 @@ def register; end end end + describe "#ecs_compatibility" do + let(:plugin_class) do + Class.new(LogStash::Filters::Base) do + config_name "ecs_validator_sample" + def register; end + end + end + let(:config) { Hash.new } + let(:instance) { plugin_class.new(config) } + + let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object } + before(:each) do + allow(plugin_class).to receive(:deprecation_logger).and_return(deprecation_logger_stub) + end + + context 'when plugin initialized with explicit value' do + let(:config) { super().merge("ecs_compatibility" => "v17") } + it 'returns the explicitly-given value' do + expect(instance.ecs_compatibility).to eq(:v17) + end + end + + context 'when plugin is not initialized with an explicit value' do + let(:settings_stub) { LogStash::SETTINGS.clone } + + before(:each) do + allow(settings_stub).to receive(:get_value).with(anything).and_call_original # allow spies + stub_const('LogStash::SETTINGS', settings_stub) + end + + context 'and pipeline-level setting is explicitly `v1`' do + let(:settings_stub) do + super().tap do |settings| + settings.set_value('pipeline.ecs_compatibility', 'v1') + end + end + it 'reads the setting' do + expect(instance.ecs_compatibility).to eq(:v1) + + expect(settings_stub).to have_received(:get_value) + end + end + + context 'and pipeline-level setting is not specified' do + it 'emits a deprecation warning about using the default which may change' do + instance.ecs_compatibility + + expect(deprecation_logger_stub).to have_received(:deprecated) do |message| + expect(message).to include("Relying on default value of `pipeline.ecs_compatibility`") + end + end + it 'returns `disabled`' do + # Default value of `pipeline.ecs_compatibility` + expect(instance.ecs_compatibility).to eq(:disabled) + end + end + end + + end + describe "deprecation logger" do let(:config) do { diff --git a/logstash-core/spec/support/shared_contexts.rb b/logstash-core/spec/support/shared_contexts.rb index 0dce1799339..042869df574 100644 --- a/logstash-core/spec/support/shared_contexts.rb +++ b/logstash-core/spec/support/shared_contexts.rb @@ -22,8 +22,9 @@ let(:plugin_id) { :plugin_id } let(:plugin_type) { :plugin_type } let(:dlq_writer) { double("dlq_writer") } + let(:execution_context_factory) { ::LogStash::Plugins::ExecutionContextFactory.new(agent, pipeline, dlq_writer) } let(:execution_context) do - ::LogStash::ExecutionContext.new(pipeline, agent, plugin_id, plugin_type, dlq_writer) + execution_context_factory.create(plugin_id, plugin_type) end before do diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index d802679235b..abf86480298 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -26,6 +26,7 @@ import org.jruby.anno.JRubyClass; import org.jruby.exceptions.RaiseException; import org.jruby.javasupport.JavaUtil; +import org.jruby.runtime.Block; import org.jruby.runtime.ObjectAllocator; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.ackedqueue.QueueFactoryExt; @@ -74,6 +75,7 @@ import org.logstash.log.SlowLoggerExt; import org.logstash.plugins.HooksRegistryExt; import org.logstash.plugins.UniversalPluginExt; +import org.logstash.plugins.factory.ContextualizerExt; import org.logstash.util.UtilExt; import org.logstash.plugins.factory.ExecutionContextFactoryExt; import org.logstash.plugins.factory.PluginMetricsFactoryExt; @@ -199,6 +201,8 @@ public final class RubyUtil { public static final RubyClass PLUGIN_FACTORY_CLASS; + public static final RubyModule PLUGIN_CONTEXTUALIZER_MODULE; + public static final RubyClass LOGGER; public static final RubyModule LOGGABLE_MODULE; @@ -333,7 +337,7 @@ public final class RubyUtil { UTIL_MODULE = LOGSTASH_MODULE.defineModuleUnder("Util"); UTIL_MODULE.defineAnnotatedMethods(UtilExt.class); ABSTRACT_DLQ_WRITER_CLASS = UTIL_MODULE.defineClassUnder( - "AbstractDeadLetterQueueWriterExt", RUBY.getObject(), + "AbstractDeadLetterQueueWriter", RUBY.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR ); ABSTRACT_DLQ_WRITER_CLASS.defineAnnotatedMethods(AbstractDeadLetterQueueWriterExt.class); @@ -401,6 +405,7 @@ public final class RubyUtil { EXECUTION_CONTEXT_CLASS = setupLogstashClass( ExecutionContextExt::new, ExecutionContextExt.class ); + EXECUTION_CONTEXT_CLASS.defineConstant("Empty", EXECUTION_CONTEXT_CLASS.newInstance(RUBY.getCurrentContext(), RUBY.getNil(), RUBY.getNil(), RUBY.getNil(), Block.NULL_BLOCK)); RUBY_TIMESTAMP_CLASS = setupLogstashClass( JrubyTimestampExtLibrary.RubyTimestamp::new, JrubyTimestampExtLibrary.RubyTimestamp.class ); @@ -551,6 +556,8 @@ public final class RubyUtil { "PluginFactory", RUBY.getObject(), PluginFactoryExt::new ); PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.class); + PLUGIN_CONTEXTUALIZER_MODULE = PLUGINS_MODULE.defineOrGetModuleUnder("Contextualizer"); + PLUGIN_CONTEXTUALIZER_MODULE.defineAnnotatedMethods(ContextualizerExt.class); UNIVERSAL_PLUGIN_CLASS = setupLogstashClass(UniversalPluginExt::new, UniversalPluginExt.class); EVENT_DISPATCHER_CLASS = @@ -636,4 +643,18 @@ public static IRubyObject toRubyObject(Object javaObject) { return JavaUtil.convertJavaToRuby(RUBY, javaObject); } + /** + * Cast an IRubyObject that may be nil to a specific class + * @param objectOrNil an object of either type {@code } or nil. + * @param the type to cast non-nil values to + * @return The given value, cast to {@code }, or null. + */ + public static T nilSafeCast(final IRubyObject objectOrNil) { + if (objectOrNil == null || objectOrNil.isNil()) { return null; } + + @SuppressWarnings("unchecked") + final T objectAsCasted = (T) objectOrNil; + + return objectAsCasted; + } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index 092e1c0e578..026ef64c999 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -164,8 +164,9 @@ private Map setupOutputs(ConfigVariableExpan outs.forEach(v -> { final PluginDefinition def = v.getPluginDefinition(); final SourceWithMetadata source = v.getSourceWithMetadata(); + final Map args = expandArguments(def, cve); res.put(v.getId(), pluginFactory.buildOutput( - RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve) + RubyUtil.RUBY.newString(def.getName()), convertArgs(args), source )); }); return res; @@ -181,8 +182,9 @@ private Map setupFilters(ConfigVariableExpan for (final PluginVertex vertex : filterPlugins) { final PluginDefinition def = vertex.getPluginDefinition(); final SourceWithMetadata source = vertex.getSourceWithMetadata(); + final Map args = expandArguments(def, cve); res.put(vertex.getId(), pluginFactory.buildFilter( - RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve) + RubyUtil.RUBY.newString(def.getName()), convertArgs(args), source )); } return res; @@ -197,71 +199,47 @@ private Collection setupInputs(ConfigVariableExpander cve) { vertices.forEach(v -> { final PluginDefinition def = v.getPluginDefinition(); final SourceWithMetadata source = v.getSourceWithMetadata(); + final Map args = expandArguments(def, cve); IRubyObject o = pluginFactory.buildInput( - RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve)); + RubyUtil.RUBY.newString(def.getName()), convertArgs(args), source); nodes.add(o); }); return nodes; } - /** - * Converts plugin arguments from the format provided by {@link PipelineIR} into coercible - * Ruby types. - * @param def PluginDefinition as provided by {@link PipelineIR} - * @return RubyHash of plugin arguments as understood by {@link RubyIntegration.PluginFactory} - * methods - */ - private RubyHash convertArgs(final PluginDefinition def) { + + final RubyHash convertArgs(final Map input) { final RubyHash converted = RubyHash.newHash(RubyUtil.RUBY); - for (final Map.Entry entry : def.getArguments().entrySet()) { + for (final Map.Entry entry : input.entrySet()) { final Object value = entry.getValue(); final String key = entry.getKey(); - final Object toput; - if (value instanceof PluginStatement) { - final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition(); - SourceWithMetadata source = ((PluginStatement) value).getSourceWithMetadata(); - toput = pluginFactory.buildCodec( - RubyUtil.RUBY.newString(codec.getName()), - source, - Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), - codec.getArguments() - ); - } else { - toput = value; - } - converted.put(key, toput); + converted.put(key, value); } + return converted; } - /** - * Converts plugin arguments from the format provided by {@link PipelineIR} into coercible - * Java types for consumption by Java plugins. - * @param def PluginDefinition as provided by {@link PipelineIR} - * @return Map of plugin arguments as understood by the {@link RubyIntegration.PluginFactory} - * methods that create Java plugins - */ - private Map convertJavaArgs(final PluginDefinition def, ConfigVariableExpander cve) { - Map args = expandConfigVariables(cve, def.getArguments()); - for (final Map.Entry entry : args.entrySet()) { - final Object value = entry.getValue(); + + private Map expandArguments(final PluginDefinition pluginDefinition, final ConfigVariableExpander cve) { + Map arguments = expandConfigVariables(cve, pluginDefinition.getArguments()); + + // Intercept codec definitions from LIR + for (final Map.Entry entry : arguments.entrySet()) { final String key = entry.getKey(); - final IRubyObject toput; + final Object value = entry.getValue(); if (value instanceof PluginStatement) { - final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition(); - SourceWithMetadata source = ((PluginStatement) value).getSourceWithMetadata(); - Map codecArgs = expandConfigVariables(cve, codec.getArguments()); - toput = pluginFactory.buildCodec( - RubyUtil.RUBY.newString(codec.getName()), - source, - Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), - codecArgs - ); - Codec javaCodec = (Codec)JavaUtil.unwrapJavaValue(toput); - args.put(key, javaCodec); + final PluginStatement codecPluginStatement = (PluginStatement) value; + final PluginDefinition codecDefinition = codecPluginStatement.getPluginDefinition(); + final SourceWithMetadata codecSource = codecPluginStatement.getSourceWithMetadata(); + final Map codecArguments = expandArguments(codecDefinition, cve); + IRubyObject codecInstance = pluginFactory.buildCodec(RubyUtil.RUBY.newString(codecDefinition.getName()), + Rubyfier.deep(RubyUtil.RUBY, codecArguments), + codecSource); + arguments.put(key, codecInstance); } } - return args; + + return arguments; } @SuppressWarnings({"rawtypes", "unchecked"}) diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java index c5afaac0592..b50f7665240 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java @@ -32,9 +32,14 @@ import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.internal.runtime.methods.DynamicMethod; +import org.jruby.runtime.Block; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; +import org.logstash.execution.ExecutionContextExt; +import org.logstash.plugins.factory.ContextualizerExt; + +import static org.logstash.RubyUtil.PLUGIN_CONTEXTUALIZER_MODULE; public final class OutputStrategyExt { @@ -176,6 +181,9 @@ public LegacyOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) { @JRubyMethod(required = 4) public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) { + final RubyClass outputClass = (RubyClass) args[0]; + final IRubyObject metric = args[1]; + final ExecutionContextExt executionContext = (ExecutionContextExt) args[2]; final RubyHash pluginArgs = (RubyHash) args[3]; workerCount = pluginArgs.op_aref(context, context.runtime.newString("workers")); if (workerCount.isNil()) { @@ -185,12 +193,9 @@ public IRubyObject initialize(final ThreadContext context, final IRubyObject[] a workerQueue = new ArrayBlockingQueue<>(count); workers = context.runtime.newArray(count); for (int i = 0; i < count; ++i) { - final RubyClass outputClass = (RubyClass) args[0]; - // Calling "new" here manually to allow mocking the ctor in RSpec Tests - final IRubyObject output = outputClass.callMethod(context, "new", pluginArgs); + final IRubyObject output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs); initOutputCallsite(outputClass); - output.callMethod(context, "metric=", args[1]); - output.callMethod(context, "execution_context=", args[2]); + output.callMethod(context, "metric=", metric); workers.append(output); workerQueue.add(output); } @@ -248,11 +253,15 @@ protected SimpleAbstractOutputStrategyExt(final Ruby runtime, final RubyClass me @JRubyMethod(required = 4) public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) { final RubyClass outputClass = (RubyClass) args[0]; + final IRubyObject metric = args[1]; + final ExecutionContextExt executionContext = (ExecutionContextExt) args[2]; + final RubyHash pluginArgs = (RubyHash) args[3]; + // TODO: fixup mocks // Calling "new" here manually to allow mocking the ctor in RSpec Tests - output = args[0].callMethod(context, "new", args[3]); + output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs); + initOutputCallsite(outputClass); - output.callMethod(context, "metric=", args[1]); - output.callMethod(context, "execution_context=", args[2]); + output.callMethod(context, "metric=", metric); return this; } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java deleted file mode 100644 index e476ad4368b..00000000000 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.logstash.config.ir.compiler; - -import co.elastic.logstash.api.Codec; -import org.jruby.RubyString; -import org.jruby.runtime.builtin.IRubyObject; -import co.elastic.logstash.api.Configuration; -import co.elastic.logstash.api.Context; -import co.elastic.logstash.api.Filter; -import co.elastic.logstash.api.Input; -import org.logstash.common.SourceWithMetadata; - -import java.util.Map; - -/** - * Factory that can instantiate Java plugins as well as Ruby plugins. - */ -public interface PluginFactory extends RubyIntegration.PluginFactory { - - Input buildInput(String name, String id, Configuration configuration, Context context); - - Filter buildFilter(String name, String id, Configuration configuration, Context context); - - final class Default implements PluginFactory { - - private final RubyIntegration.PluginFactory rubyFactory; - - public Default(final RubyIntegration.PluginFactory rubyFactory) { - this.rubyFactory = rubyFactory; - } - - @Override - public Input buildInput(final String name, final String id, final Configuration configuration, final Context context) { - return null; - } - - @Override - public Filter buildFilter(final String name, final String id, final Configuration configuration, final Context context) { - return null; - } - - @Override - public IRubyObject buildInput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { - return rubyFactory.buildInput(name, source, args, pluginArgs); - } - - @Override - public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, final Map pluginArgs) { - return rubyFactory.buildOutput(name, source, args, pluginArgs); - } - - @Override - public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source, - final IRubyObject args, final Map pluginArgs) { - return rubyFactory.buildFilter(name, source, args, pluginArgs); - } - - @Override - public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args, - Map pluginArgs) { - return rubyFactory.buildCodec(name, source, args, pluginArgs); - } - - @Override - public Codec buildDefaultCodec(final String codecName) { - return null; - } - } -} diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java index faac9f66d38..bc66371c67a 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java @@ -41,17 +41,13 @@ private RubyIntegration() { */ public interface PluginFactory { - IRubyObject buildInput(RubyString name, SourceWithMetadata source, - IRubyObject args, Map pluginArgs); + IRubyObject buildInput(RubyString name, IRubyObject args, SourceWithMetadata source); - AbstractOutputDelegatorExt buildOutput(RubyString name, SourceWithMetadata source, - IRubyObject args, Map pluginArgs); + AbstractOutputDelegatorExt buildOutput(RubyString name, IRubyObject args, SourceWithMetadata source); - AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadata source, IRubyObject args, - Map pluginArgs); + AbstractFilterDelegatorExt buildFilter(RubyString name, IRubyObject args, SourceWithMetadata source); - IRubyObject buildCodec(RubyString name, SourceWithMetadata source, IRubyObject args, - Map pluginArgs); + IRubyObject buildCodec(RubyString name, IRubyObject args, SourceWithMetadata source); Codec buildDefaultCodec(String codecName); diff --git a/logstash-core/src/main/java/org/logstash/execution/Engine.java b/logstash-core/src/main/java/org/logstash/execution/Engine.java new file mode 100644 index 00000000000..112b6bf2d84 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/Engine.java @@ -0,0 +1,7 @@ +package org.logstash.execution; + +public enum Engine { + RUBY, + JAVA, + ; +} diff --git a/logstash-core/src/main/java/org/logstash/execution/ExecutionContextExt.java b/logstash-core/src/main/java/org/logstash/execution/ExecutionContextExt.java index e99cd52aa98..f7815c746cc 100644 --- a/logstash-core/src/main/java/org/logstash/execution/ExecutionContextExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/ExecutionContextExt.java @@ -25,6 +25,7 @@ import org.jruby.RubyObject; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.Block; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; @@ -45,14 +46,16 @@ public ExecutionContextExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); } - @JRubyMethod(required = 5) + @JRubyMethod(required = 2, optional = 1) public ExecutionContextExt initialize(final ThreadContext context, final IRubyObject[] args) { pipeline = args[0]; agent = args[1]; - dlqWriter = new AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt( - context.runtime, RubyUtil.PLUGIN_DLQ_WRITER_CLASS - ).initialize(context, args[4], args[2], args[3]); + if (args.length > 2 && !args[2].isNil()) { + dlqWriter = (AbstractDeadLetterQueueWriterExt) args[2]; + } else { + dlqWriter = (AbstractDeadLetterQueueWriterExt) RubyUtil.DUMMY_DLQ_WRITER_CLASS.newInstance(context, Block.NULL_BLOCK); + } return this; } @@ -73,6 +76,9 @@ public IRubyObject pipeline(final ThreadContext context) { @JRubyMethod(name = "pipeline_id") public IRubyObject pipelineId(final ThreadContext context) { + if (pipeline.isNil()) { + return context.nil; + } return pipeline.callMethod(context, "pipeline_id"); } } diff --git a/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java index aea003dde43..7d58f2c1a15 100644 --- a/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java @@ -77,7 +77,8 @@ public JavaBasePipelineExt initialize(final ThreadContext context, final IRubyOb new ExecutionContextFactoryExt( context.runtime, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS ).initialize(context, args[3], this, dlqWriter(context)), - RubyUtil.FILTER_DELEGATOR_CLASS + RubyUtil.FILTER_DELEGATOR_CLASS, + Engine.JAVA ), getSecretStore(context) ); diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/ContextualizerExt.java b/logstash-core/src/main/java/org/logstash/plugins/factory/ContextualizerExt.java new file mode 100644 index 00000000000..becba99cb85 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/ContextualizerExt.java @@ -0,0 +1,99 @@ +package org.logstash.plugins.factory; + +import org.jruby.RubyClass; +import org.jruby.RubyHash; +import org.jruby.RubyModule; +import org.jruby.anno.JRubyMethod; +import org.jruby.anno.JRubyModule; +import org.jruby.runtime.Block; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; +import org.logstash.execution.ExecutionContextExt; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.jruby.runtime.Helpers.invokeSuper; +import static org.logstash.RubyUtil.PLUGIN_CONTEXTUALIZER_MODULE; + +/** + * The {@link ContextualizerExt} is used to inject {@link org.logstash.execution.ExecutionContextExt } to plugins + * before they are initialized. + * + * @see ContextualizerExt#initializePlugin(ThreadContext, IRubyObject, IRubyObject[], Block) + */ +@JRubyModule(name = "Contextualizer") +public class ContextualizerExt { + + private static final String EXECUTION_CONTEXT_IVAR_NAME = "@execution_context"; + + /* + * @overload PluginContextualizer::initialize_plugin(execution_context, plugin_class, *plugin_args, &pluginBlock) + */ + @JRubyMethod(name = "initialize_plugin", meta = true, required = 2, rest = true) + public static IRubyObject initializePlugin(final ThreadContext context, + final IRubyObject recv, + final IRubyObject[] args, + final Block block) { + final List argsList = new ArrayList<>(Arrays.asList(args)); + final ExecutionContextExt executionContext = RubyUtil.nilSafeCast(argsList.remove(0)); + final RubyClass pluginClass = (RubyClass) argsList.remove(0); + final IRubyObject[] pluginArgs = argsList.toArray(new IRubyObject[]{}); + + return initializePlugin(context, (RubyModule) recv, executionContext, pluginClass, pluginArgs, block); + } + + public static IRubyObject initializePlugin(final ThreadContext context, + @Nullable final ExecutionContextExt executionContextExt, + final RubyClass pluginClass, + final RubyHash pluginArgs) { + return initializePlugin(context, PLUGIN_CONTEXTUALIZER_MODULE, executionContextExt, pluginClass, new IRubyObject[]{pluginArgs}, Block.NULL_BLOCK); + } + + private static IRubyObject initializePlugin(final ThreadContext context, + final RubyModule recv, + @Nullable final ExecutionContextExt executionContext, + final RubyClass pluginClass, + final IRubyObject[] pluginArgs, + final Block block) { + synchronized (ContextualizerExt.class) { + if (!pluginClass.hasModuleInPrepends(recv)) { + pluginClass.prepend(context, new IRubyObject[]{recv}); + } + } + + final IRubyObject[] pluginInitArgs; + if (executionContext == null) { + pluginInitArgs = pluginArgs; + } else { + List pluginInitArgList = new ArrayList<>(1 + pluginArgs.length); + pluginInitArgList.add(executionContext); + pluginInitArgList.addAll(Arrays.asList(pluginArgs)); + pluginInitArgs = pluginInitArgList.toArray(new IRubyObject[]{}); + } + + // We must use IRubyObject#callMethod(...,"new",...) here to continue supporting + // mocking/validating from rspec. + return pluginClass.callMethod(context, "new", pluginInitArgs, block); + } + + @JRubyMethod(name = "initialize", rest = true, frame = true) // framed for invokeSuper + public static IRubyObject initialize(final ThreadContext context, + final IRubyObject recv, + final IRubyObject[] args, + final Block block) { + final List argsList = new ArrayList<>(Arrays.asList(args)); + + if (args.length > 0 && args[0] instanceof ExecutionContextExt) { + final ExecutionContextExt executionContext = (ExecutionContextExt) argsList.remove(0); + recv.getInstanceVariables().setInstanceVariable(EXECUTION_CONTEXT_IVAR_NAME, executionContext); + } + + final IRubyObject[] restArgs = argsList.toArray(new IRubyObject[]{}); + + return invokeSuper(context, recv, restArgs, block); + } +} diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java index d72ef4a5cd9..2799eb12ce4 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java @@ -46,10 +46,14 @@ public ExecutionContextFactoryExt initialize(final ThreadContext context, final @JRubyMethod public ExecutionContextExt create(final ThreadContext context, final IRubyObject id, final IRubyObject classConfigName) { + final AbstractDeadLetterQueueWriterExt dlqWriterForInstance = new AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt( + context.runtime, RubyUtil.PLUGIN_DLQ_WRITER_CLASS + ).initialize(context, dlqWriter, id, classConfigName); + return new ExecutionContextExt( context.runtime, RubyUtil.EXECUTION_CONTEXT_CLASS ).initialize( - context, new IRubyObject[]{pipeline, agent, id, classConfigName, dlqWriter} + context, new IRubyObject[]{pipeline, agent, dlqWriterForInstance} ); } diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java index a226e57d56d..0d3226e0755 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java @@ -5,6 +5,7 @@ import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.javasupport.JavaUtil; +import org.jruby.runtime.Block; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; @@ -13,6 +14,7 @@ import org.logstash.config.ir.PipelineIR; import org.logstash.config.ir.compiler.*; import org.logstash.config.ir.graph.Vertex; +import org.logstash.execution.Engine; import org.logstash.execution.ExecutionContextExt; import org.logstash.instrument.metrics.AbstractMetricExt; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; @@ -21,6 +23,7 @@ import org.logstash.plugins.PluginLookup; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; @JRubyClass(name = "PluginFactory") public final class PluginFactoryExt extends RubyBasicObject @@ -35,7 +38,9 @@ public interface PluginResolver { private static final RubyString ID_KEY = RubyUtil.RUBY.newString("id"); - private final Collection pluginsById = new HashSet<>(); + private final Collection pluginsById = ConcurrentHashMap.newKeySet(); + + private Engine engine; private PipelineIR lir; @@ -43,7 +48,7 @@ public interface PluginResolver { private PluginMetricsFactoryExt metrics; - private RubyClass filterClass; + private RubyClass filterDelegatorClass; private ConfigVariableExpander configVariables; @@ -54,16 +59,22 @@ public interface PluginResolver { @JRubyMethod(name = "filter_delegator", meta = true, required = 5) public static IRubyObject filterDelegator(final ThreadContext context, final IRubyObject recv, final IRubyObject... args) { + // filterDelegatorClass, klass, rubyArgs, typeScopedMetric, executionCntx + final RubyClass filterDelegatorClass = (RubyClass) args[0]; + final RubyClass klass = (RubyClass) args[1]; final RubyHash arguments = (RubyHash) args[2]; - final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments); + final AbstractMetricExt typeScopedMetric = (AbstractMetricExt) args[3]; + final ExecutionContextExt executionContext = (ExecutionContextExt) args[4]; + + final IRubyObject filterInstance = ContextualizerExt.initializePlugin(context, executionContext, klass, arguments); + final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY); filterInstance.callMethod( context, "metric=", - ((AbstractMetricExt) args[3]).namespace(context, id.intern()) + typeScopedMetric.namespace(context, id.intern()) ); - filterInstance.callMethod(context, "execution_context=", args[4]); - return new FilterDelegatorExt(context.runtime, RubyUtil.FILTER_DELEGATOR_CLASS) - .initialize(context, filterInstance, id); + + return filterDelegatorClass.newInstance(context, filterInstance, id, Block.NULL_BLOCK); } public PluginFactoryExt(final Ruby runtime, final RubyClass metaClass) { @@ -80,25 +91,33 @@ public PluginFactoryExt initialize(final ThreadContext context, final IRubyObject[] args) { return init( args[0].toJava(PipelineIR.class), - (PluginMetricsFactoryExt) args[1], (ExecutionContextFactoryExt) args[2], - (RubyClass) args[3] + (PluginMetricsFactoryExt) args[1], + (ExecutionContextFactoryExt) args[2], + (RubyClass) args[3], + EnvironmentVariableProvider.defaultProvider(), + Engine.RUBY ); } - public PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metrics, - final ExecutionContextFactoryExt executionContextFactoryExt, - final RubyClass filterClass) { - return this.init(lir, metrics, executionContextFactoryExt, filterClass, EnvironmentVariableProvider.defaultProvider()); + public PluginFactoryExt init(final PipelineIR lir, + final PluginMetricsFactoryExt metrics, + final ExecutionContextFactoryExt executionContextFactoryExt, + final RubyClass filterClass, + final Engine engine) { + return this.init(lir, metrics, executionContextFactoryExt, filterClass, EnvironmentVariableProvider.defaultProvider(), engine); } - PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metrics, + PluginFactoryExt init(final PipelineIR lir, + final PluginMetricsFactoryExt metrics, final ExecutionContextFactoryExt executionContextFactoryExt, final RubyClass filterClass, - final EnvironmentVariableProvider envVars) { + final EnvironmentVariableProvider envVars, + final Engine engine) { this.lir = lir; this.metrics = metrics; this.executionContextFactory = executionContextFactoryExt; - this.filterClass = filterClass; + this.filterDelegatorClass = filterClass; + this.engine = engine; this.pluginCreatorsRegistry.put(PluginLookup.PluginType.INPUT, new InputPluginCreator(this)); this.pluginCreatorsRegistry.put(PluginLookup.PluginType.CODEC, new CodecPluginCreator()); this.pluginCreatorsRegistry.put(PluginLookup.PluginType.FILTER, new FilterPluginCreator()); @@ -109,71 +128,102 @@ PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metric @SuppressWarnings("unchecked") @Override - public IRubyObject buildInput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public IRubyObject buildInput(final RubyString name, + final IRubyObject args, + final SourceWithMetadata source) { return plugin( - RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT, name.asJavaString(), - source, (Map) args, pluginArgs + RubyUtil.RUBY.getCurrentContext(), + PluginLookup.PluginType.INPUT, + name.asJavaString(), + (RubyHash) args, + source ); } @SuppressWarnings("unchecked") @Override - public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public AbstractOutputDelegatorExt buildOutput(final RubyString name, + final IRubyObject args, + final SourceWithMetadata source) { return (AbstractOutputDelegatorExt) plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT, name.asJavaString(), - source, (Map) args, pluginArgs + (RubyHash) args, source ); } @SuppressWarnings("unchecked") @Override - public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public AbstractFilterDelegatorExt buildFilter(final RubyString name, + final IRubyObject args, + final SourceWithMetadata source) { return (AbstractFilterDelegatorExt) plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER, name.asJavaString(), - source, (Map) args, pluginArgs + (RubyHash) args, source ); } @SuppressWarnings("unchecked") @Override - public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args, - Map pluginArgs) { + public IRubyObject buildCodec(final RubyString name, + final IRubyObject args, + final SourceWithMetadata source) { return plugin( - RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, - name.asJavaString(), source, (Map) args, pluginArgs + RubyUtil.RUBY.getCurrentContext(), + PluginLookup.PluginType.CODEC, + name.asJavaString(), + (RubyHash) args, + source ); } @Override public Codec buildDefaultCodec(String codecName) { return (Codec) JavaUtil.unwrapJavaValue(plugin( - RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, - codecName, null, Collections.emptyMap(), Collections.emptyMap() + RubyUtil.RUBY.getCurrentContext(), + PluginLookup.PluginType.CODEC, + codecName, + RubyHash.newHash(RubyUtil.RUBY), + null )); } @SuppressWarnings("unchecked") @JRubyMethod(required = 3, optional = 1) public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) { + final SourceWithMetadata source = args.length > 3 ? (SourceWithMetadata) JavaUtil.unwrapIfJavaObject(args[3]) : null; + return plugin( context, PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)), args[1].asJavaString(), - JavaUtil.unwrapIfJavaObject(args[2]), - args.length > 3 ? (Map) args[3] : new HashMap<>(), - null + (RubyHash) args[2], + source ); } @SuppressWarnings("unchecked") - private IRubyObject plugin(final ThreadContext context, final PluginLookup.PluginType type, final String name, - SourceWithMetadata source, final Map args, - Map pluginArgs) { - final String id = generateOrRetrievePluginId(context, type, name, source); - pluginsById.add(id); + private IRubyObject plugin(final ThreadContext context, + final PluginLookup.PluginType type, + final String name, + final RubyHash args, + final SourceWithMetadata source) { + final String id = generateOrRetrievePluginId(type, source, args); + + if (id == null) { + throw context.runtime.newRaiseException( + RubyUtil.CONFIGURATION_ERROR_CLASS, + String.format( + "Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name + ) + ); + } + if (!pluginsById.add(id)) { + throw context.runtime.newRaiseException( + RubyUtil.CONFIGURATION_ERROR_CLASS, + String.format("Two plugins have the id '%s', please fix this conflict", id) + ); + } + final AbstractNamespacedMetricExt typeScopedMetric = metrics.create(context, type.rubyLabel()); final PluginLookup.PluginClass pluginClass = pluginResolver.resolve(type, name); @@ -199,19 +249,19 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi } else if (type == PluginLookup.PluginType.FILTER) { return filterDelegator( context, null, - filterClass, klass, rubyArgs, typeScopedMetric, executionCntx); + filterDelegatorClass, klass, rubyArgs, typeScopedMetric, executionCntx); } else { - final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs); + final IRubyObject pluginInstance = ContextualizerExt.initializePlugin(context, executionCntx, klass, rubyArgs); + final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id)); scopedMetric.gauge(context, MetricKeys.NAME_KEY, pluginInstance.callMethod(context, "config_name")); pluginInstance.callMethod(context, "metric=", scopedMetric); - pluginInstance.callMethod(context, "execution_context=", executionCntx); return pluginInstance; } } else { - if (pluginArgs == null) { - String err = String.format("Cannot start the Java plugin '%s' in the Ruby execution engine." + - " The Java execution engine is required to run Java plugins.", name); + if (engine != Engine.JAVA) { + String err = String.format("Cannot start the Java plugin '%s' in the %s execution engine." + + " The Java execution engine is required to run Java plugins.", name, engine); throw new IllegalStateException(err); } @@ -221,38 +271,97 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi } Context contextWithMetrics = executionContextFactory.toContext(type, metrics.getRoot(context)); - return pluginCreator.createDelegator(name, pluginArgs, id, typeScopedMetric, pluginClass, contextWithMetrics); + return pluginCreator.createDelegator(name, convertToJavaCoercible(args), id, typeScopedMetric, pluginClass, contextWithMetrics); } } - private String generateOrRetrievePluginId(ThreadContext context, PluginLookup.PluginType type, String name, - SourceWithMetadata source) { - final String id; - if (type == PluginLookup.PluginType.CODEC) { - id = UUID.randomUUID().toString(); + private Map convertToJavaCoercible(Map input) { + final Map output = new HashMap<>(input); + + // Intercept Codecs + for (final Map.Entry entry : input.entrySet()) { + final String key = entry.getKey(); + final Object value = entry.getValue(); + if (value instanceof IRubyObject) { + final Object unwrapped = JavaUtil.unwrapJavaValue((IRubyObject) value); + if (unwrapped instanceof Codec) { + output.put(key, unwrapped); + } + } + } + + return output; + } + + // TODO: caller seems to think that the args is `Map`, but + // at least any `id` present is actually a `String`. + private String generateOrRetrievePluginId(final PluginLookup.PluginType type, + final SourceWithMetadata source, + final Map args) { + final Optional unprocessedId; + if (source == null) { + unprocessedId = extractId(() -> extractIdFromArgs(args), + this::generateUUID); } else { - String unresolvedId = lir.getGraph().vertices() - .filter(v -> v.getSourceWithMetadata() != null - && v.getSourceWithMetadata().equalsWithoutText(source)) - .findFirst() - .map(Vertex::getId).orElse(null); - id = (String) configVariables.expand(unresolvedId); + unprocessedId = extractId(() -> extractIdFromLIR(source), + () -> extractIdFromArgs(args), + () -> generateUUIDForCodecs(type)); } - if (id == null) { - throw context.runtime.newRaiseException( - RubyUtil.CONFIGURATION_ERROR_CLASS, - String.format( - "Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name - ) - ); + + return unprocessedId + .map(configVariables::expand) + .filter(String.class::isInstance) + .map(String.class::cast) + .orElse(null); + } + + private Optional extractId(final IdExtractor... extractors) { + for (IdExtractor extractor : extractors) { + final Optional extracted = extractor.extract(); + if (extracted.isPresent()) { + return extracted; + } } - if (pluginsById.contains(id)) { - throw context.runtime.newRaiseException( - RubyUtil.CONFIGURATION_ERROR_CLASS, - String.format("Two plugins have the id '%s', please fix this conflict", id) - ); + return Optional.empty(); + } + + @FunctionalInterface + interface IdExtractor { + Optional extract(); + } + + private Optional extractIdFromArgs(final Map args) { + if (!args.containsKey("id")) { + return Optional.empty(); + } + + final Object explicitId = args.get("id"); + if (explicitId instanceof String) { + return Optional.of((String) explicitId); + } else if (explicitId instanceof RubyString) { + return Optional.of(((RubyString) explicitId).asJavaString()); + } else { + return Optional.empty(); } - return id; + } + + private Optional generateUUID() { + return Optional.of(UUID.randomUUID().toString()); + } + + private Optional generateUUIDForCodecs(final PluginLookup.PluginType pluginType) { + if (pluginType == PluginLookup.PluginType.CODEC) { + return generateUUID(); + } + return Optional.empty(); + } + + private Optional extractIdFromLIR(final SourceWithMetadata source) { + return lir.getGraph().vertices() + .filter(v -> v.getSourceWithMetadata() != null + && v.getSourceWithMetadata().equalsWithoutText(source)) + .findFirst() + .map(Vertex::getId); } ExecutionContextFactoryExt getExecutionContextFactory() { diff --git a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java index 0ac7195273f..bf93b73d18a 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java @@ -54,12 +54,8 @@ import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; import org.logstash.config.ir.compiler.ComputeStepSyntaxElement; import org.logstash.config.ir.compiler.FilterDelegatorExt; -import org.logstash.config.ir.compiler.PluginFactory; +import org.logstash.config.ir.compiler.RubyIntegration; import org.logstash.ext.JrubyEventExtLibrary; -import co.elastic.logstash.api.Configuration; -import co.elastic.logstash.api.Filter; -import co.elastic.logstash.api.Input; -import co.elastic.logstash.api.Context; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -494,9 +490,9 @@ private Supplier>> mockOutpu } /** - * Configurable Mock {@link PluginFactory} + * Configurable Mock {@link RubyIntegration.PluginFactory} */ - static final class MockPluginFactory implements PluginFactory { + static final class MockPluginFactory implements RubyIntegration.PluginFactory { private final Map> inputs; @@ -514,20 +510,20 @@ static final class MockPluginFactory implements PluginFactory { } @Override - public IRubyObject buildInput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public IRubyObject buildInput(final RubyString name, final IRubyObject args, + SourceWithMetadata source) { return setupPlugin(name, inputs); } @Override - public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public AbstractOutputDelegatorExt buildOutput(final RubyString name, final IRubyObject args, + SourceWithMetadata source) { return PipelineTestUtil.buildOutput(setupPlugin(name, outputs)); } @Override - public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public AbstractFilterDelegatorExt buildFilter(final RubyString name, final IRubyObject args, + SourceWithMetadata source) { final RubyObject configNameDouble = org.logstash.config.ir.PluginConfigNameMethodDouble.create(name); return new FilterDelegatorExt( RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS) @@ -535,8 +531,7 @@ public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithM } @Override - public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args, - Map pluginArgs) { + public IRubyObject buildCodec(final RubyString name, final IRubyObject args, SourceWithMetadata source) { throw new IllegalStateException("No codec setup expected in this test."); } @@ -555,17 +550,6 @@ private static T setupPlugin(final RubyString name, } return suppliers.get(name.asJavaString()).get(); } - - @Override - public Input buildInput(final String name, final String id, final Configuration configuration, final Context context) { - return null; - } - - @Override - public Filter buildFilter(final String name, final String id, - final Configuration configuration, final Context context) { - return null; - } } @Test @@ -698,9 +682,9 @@ private String createBigFilterSection(int numFilters) { } /** - * Fixed Mock {@link PluginFactory} + * Fixed Mock {@link RubyIntegration.PluginFactory} * */ - static final class FixedPluginFactory implements PluginFactory { + static final class FixedPluginFactory implements RubyIntegration.PluginFactory { private Supplier input; private Supplier filter; @@ -714,27 +698,17 @@ static final class FixedPluginFactory implements PluginFactory { } @Override - public Input buildInput(String name, String id, Configuration configuration, Context context) { - return null; - } - - @Override - public Filter buildFilter(String name, String id, Configuration configuration, Context context) { - return null; - } - - @Override - public IRubyObject buildInput(RubyString name, SourceWithMetadata source, IRubyObject args, Map pluginArgs) { + public IRubyObject buildInput(RubyString name, IRubyObject args, SourceWithMetadata source) { return this.input.get(); } @Override - public AbstractOutputDelegatorExt buildOutput(RubyString name, SourceWithMetadata source, IRubyObject args, Map pluginArgs) { + public AbstractOutputDelegatorExt buildOutput(RubyString name, IRubyObject args, SourceWithMetadata source) { return PipelineTestUtil.buildOutput(this.output.get()); } @Override - public AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadata source, IRubyObject args, Map pluginArgs) { + public AbstractFilterDelegatorExt buildFilter(RubyString name, IRubyObject args, SourceWithMetadata source) { final RubyObject configNameDouble = org.logstash.config.ir.PluginConfigNameMethodDouble.create(name); return new FilterDelegatorExt( RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS) @@ -742,7 +716,7 @@ public AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadat } @Override - public IRubyObject buildCodec(RubyString name, SourceWithMetadata source, IRubyObject args, Map pluginArgs) { + public IRubyObject buildCodec(RubyString name, IRubyObject args, SourceWithMetadata source) { return null; } diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/PluginFactoryTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/PluginFactoryTest.java deleted file mode 100644 index ebc3990690d..00000000000 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/PluginFactoryTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.logstash.config.ir.compiler; - -import org.junit.Test; - -/** - * Tests for {@link PluginFactory.Default}. - */ -public final class PluginFactoryTest { - - @Test - public void testBuildJavaFilter() throws Exception { - - } - -} diff --git a/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java b/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java index ca2c2f02c8c..2476b4c0b8e 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java +++ b/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java @@ -30,31 +30,29 @@ import org.logstash.plugins.codecs.Line; import java.util.Collections; -import java.util.Map; public class TestPluginFactory implements RubyIntegration.PluginFactory { @Override - public IRubyObject buildInput(RubyString name, SourceWithMetadata source, - IRubyObject args, Map pluginArgs) { + public IRubyObject buildInput(RubyString name, IRubyObject args, + SourceWithMetadata source) { return null; } @Override - public AbstractOutputDelegatorExt buildOutput(RubyString name, SourceWithMetadata source, - IRubyObject args, Map pluginArgs) { + public AbstractOutputDelegatorExt buildOutput(RubyString name, IRubyObject args, + SourceWithMetadata source) { return null; } @Override - public AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadata source, - IRubyObject args, Map pluginArgs) { + public AbstractFilterDelegatorExt buildFilter(RubyString name, IRubyObject args, + SourceWithMetadata source) { return null; } @Override - public IRubyObject buildCodec(RubyString name, SourceWithMetadata source, IRubyObject args, - Map pluginArgs) { + public IRubyObject buildCodec(RubyString name, IRubyObject args, SourceWithMetadata source) { return null; } diff --git a/logstash-core/src/test/java/org/logstash/plugins/factory/PluginFactoryExtTest.java b/logstash-core/src/test/java/org/logstash/plugins/factory/PluginFactoryExtTest.java index ded3049a657..1f621b90b85 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/factory/PluginFactoryExtTest.java +++ b/logstash-core/src/test/java/org/logstash/plugins/factory/PluginFactoryExtTest.java @@ -30,6 +30,7 @@ import org.logstash.config.ir.InvalidIRException; import org.logstash.config.ir.PipelineIR; import org.logstash.config.ir.RubyEnvTestCase; +import org.logstash.execution.Engine; import org.logstash.instrument.metrics.NamespacedMetricExt; import org.logstash.plugins.MetricTestCase; import org.logstash.plugins.PluginLookup; @@ -93,13 +94,12 @@ public void testPluginIdResolvedWithEnvironmentVariables() throws InvalidIRExcep envVars.put("CUSTOM", "test"); PluginFactoryExt sut = new PluginFactoryExt(RubyUtil.RUBY, RubyUtil.PLUGIN_FACTORY_CLASS, mockPluginResolver); - sut.init(pipelineIR, metricsFactory, execContextFactory, RubyUtil.FILTER_DELEGATOR_CLASS, envVars::get); + sut.init(pipelineIR, metricsFactory, execContextFactory, RubyUtil.FILTER_DELEGATOR_CLASS, envVars::get, Engine.JAVA); RubyString pluginName = RubyUtil.RUBY.newString("mockinput"); // Exercise - IRubyObject pluginInstance = sut.buildInput(pluginName, sourceWithMetadata, RubyHash.newHash(RubyUtil.RUBY), - Collections.emptyMap()); + IRubyObject pluginInstance = sut.buildInput(pluginName, RubyHash.newHash(RubyUtil.RUBY), sourceWithMetadata); //Verify IRubyObject id = pluginInstance.callMethod(RUBY.getCurrentContext(), "id");