From ba73478e4d6b3dc1ce8341f714ebcffd27165ad7 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Tue, 6 Oct 2020 06:45:46 -0700 Subject: [PATCH] ECS Compatibility (#12305) Implements a plugin `ecs_compatibility` option, whose default value is powered by the pipeline-level setting `pipeline.ecs_compatibility`, in line with the proposal in elastic/logstash#11623: In order to increase the confidence a user has when upgrading Logstash, this implementation uses the deprecation logger to warn when `ecs_compatibility` is used without an explicit directive. For now, as we continue to add ECS Compatibility Modes, an opting into a specific ECS Compatibility mode at a pipeline level is considered a BETA feature. All plugins using the [ECS Compatibility Support][] adapter will use the setting correctly, but pipelines configured in this way do not guarantee consistent behaviour across minor versions of Logstash or the plugins it bundles (e.g., upgraded plugins that have newly-implemented an ECS Compatibility mode will use the pipeline-level setting as a default, causing them to potentially behave differently after the upgrade). This change-set also includes a significant amount of work within the `PluginFactory`, which allows us to ensure that pipeline-level settings are available to a Logstash plugin _before_ its `initialize` is executed, including the maintaining of context for codecs that are routinely cloned. * JEE: instantiate codecs only once * PluginFactory: use passed FilterDelegator class * PluginFactory: require engine name in init * NOOP: remove useless secondary plugin factory interface * PluginFactory: simplify, compute java args only when necessary * PluginFactory: accept explicit id when vertex unavailable * PluginFactory: make source optional, args required * PluginFactory: threadsafe refactor of id duplicate tracking * PluginFactory: make id extraction/geration more abstract/understandable * PluginFactory: extract or generate ID when source not available * PluginFactory: inject ExecutionContext before initializing plugins * Codec: propagate execution_context and metric to clones * Plugin: intercept string-specified codecs and propagate execution_context * Plugin: implement `ecs_compatibility` for all plugins * Plugin: deprecate use of `Config::Mixin::DSL::validate_value(String, :codec)` --- docker/data/logstash/env2yaml/env2yaml.go | 1 + logstash-core/lib/logstash/agent.rb | 8 + logstash-core/lib/logstash/codecs/base.rb | 4 +- .../lib/logstash/config/config_ast.rb | 6 +- logstash-core/lib/logstash/config/mixin.rb | 19 +- logstash-core/lib/logstash/environment.rb | 1 + logstash-core/lib/logstash/inputs/base.rb | 8 +- logstash-core/lib/logstash/outputs/base.rb | 8 +- logstash-core/lib/logstash/pipeline.rb | 4 +- logstash-core/lib/logstash/plugin.rb | 16 +- .../plugins/ecs_compatibility_support.rb | 53 ++++ logstash-core/lib/logstash/runner.rb | 5 + logstash-core/lib/logstash/settings.rb | 1 + logstash-core/locales/en.yml | 15 ++ .../spec/logstash/config/mixin_spec.rb | 25 ++ .../execution_context_factory_spec.rb | 59 ++++ .../spec/logstash/execution_context_spec.rb | 8 +- .../spec/logstash/inputs/base_spec.rb | 40 ++- .../spec/logstash/outputs/base_spec.rb | 34 ++- logstash-core/spec/logstash/plugin_spec.rb | 85 +++++- logstash-core/spec/support/shared_contexts.rb | 3 +- .../src/main/java/org/logstash/RubyUtil.java | 23 +- .../logstash/config/ir/CompiledPipeline.java | 78 ++---- .../config/ir/compiler/OutputStrategyExt.java | 25 +- .../config/ir/compiler/PluginFactory.java | 90 ------- .../config/ir/compiler/RubyIntegration.java | 12 +- .../java/org/logstash/execution/Engine.java | 7 + .../execution/ExecutionContextExt.java | 14 +- .../execution/JavaBasePipelineExt.java | 3 +- .../plugins/factory/ContextualizerExt.java | 99 +++++++ .../factory/ExecutionContextFactoryExt.java | 6 +- .../plugins/factory/PluginFactoryExt.java | 251 +++++++++++++----- .../config/ir/CompiledPipelineTest.java | 58 ++-- .../config/ir/compiler/PluginFactoryTest.java | 35 --- .../logstash/plugins/TestPluginFactory.java | 16 +- .../plugins/factory/PluginFactoryExtTest.java | 6 +- 36 files changed, 754 insertions(+), 372 deletions(-) create mode 100644 logstash-core/lib/logstash/plugins/ecs_compatibility_support.rb create mode 100644 logstash-core/spec/logstash/execution_context_factory_spec.rb delete mode 100644 logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java create mode 100644 logstash-core/src/main/java/org/logstash/execution/Engine.java create mode 100644 logstash-core/src/main/java/org/logstash/plugins/factory/ContextualizerExt.java delete mode 100644 logstash-core/src/test/java/org/logstash/config/ir/compiler/PluginFactoryTest.java diff --git a/docker/data/logstash/env2yaml/env2yaml.go b/docker/data/logstash/env2yaml/env2yaml.go index 82483aae853..860d4da8fc4 100644 --- a/docker/data/logstash/env2yaml/env2yaml.go +++ b/docker/data/logstash/env2yaml/env2yaml.go @@ -57,6 +57,7 @@ func normalizeSetting(setting string) (string, error) { "pipeline.batch.delay", "pipeline.unsafe_shutdown", "pipeline.java_execution", + "pipeline.ecs_compatibility" "pipeline.plugin_classloaders", "path.config", "config.string", diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index f05a3b9f7f6..d1f4743f999 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -75,6 +75,14 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil) logger.warn("deprecated setting `config.field_reference.parser` set; field reference parsing is strict by default") end + if @settings.set?('pipeline.ecs_compatibility') + ecs_compatibility_value = settings.get('pipeline.ecs_compatibility') + if ecs_compatibility_value != 'disabled' + logger.warn("Setting `pipeline.ecs_compatibility` given as `#{ecs_compatibility_value}`; " + + "values other than `disabled` are currently considered BETA and may have unintended consequences when upgrading minor versions of Logstash.") + end + end + # This is for backward compatibility in the tests if source_loader.nil? @source_loader = LogStash::Config::SourceLoader.new diff --git a/logstash-core/lib/logstash/codecs/base.rb b/logstash-core/lib/logstash/codecs/base.rb index db4cde703ce..081a3b376e3 100644 --- a/logstash-core/lib/logstash/codecs/base.rb +++ b/logstash-core/lib/logstash/codecs/base.rb @@ -92,6 +92,8 @@ def flush(&block) public def clone - return self.class.new(params) + LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, self.class, params).tap do |klone| + klone.metric = @metric if klone.instance_variable_get(:@metric).nil? + end end end; end # class LogStash::Codecs::Base diff --git a/logstash-core/lib/logstash/config/config_ast.rb b/logstash-core/lib/logstash/config/config_ast.rb index e3d413bf5e4..c7c5f9cfee0 100644 --- a/logstash-core/lib/logstash/config/config_ast.rb +++ b/logstash-core/lib/logstash/config/config_ast.rb @@ -249,12 +249,12 @@ def compile_initializer # If any parent is a Plugin, this must be a codec. if attributes.elements.nil? - return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n") + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, {}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n") else settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?) attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})" - return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})" << (plugin_type == "codec" ? "" : "\n") + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n") end end @@ -271,7 +271,7 @@ def compile when "codec" settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?) attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})" - return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})" + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))" end end diff --git a/logstash-core/lib/logstash/config/mixin.rb b/logstash-core/lib/logstash/config/mixin.rb index a8febea0b0e..38ef99b9d8b 100644 --- a/logstash-core/lib/logstash/config/mixin.rb +++ b/logstash-core/lib/logstash/config/mixin.rb @@ -49,6 +49,7 @@ module LogStash::Config::Mixin include LogStash::Util::SubstitutionVariables + include LogStash::Util::Loggable attr_accessor :config attr_accessor :original_params @@ -99,6 +100,17 @@ def config_init(params) params[name.to_s] = deep_replace(value) end + # Intercept codecs that have not been instantiated + params.each do |name, value| + validator = self.class.validator_find(name) + next unless validator && validator[:validate] == :codec && value.kind_of?(String) + + codec_klass = LogStash::Plugin.lookup("codec", value) + codec_instance = LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, codec_klass) + + params[name.to_s] = LogStash::Codecs::Delegator.new(codec_instance) + end + if !self.class.validate(params) raise LogStash::ConfigurationError, I18n.t("logstash.runner.configuration.invalid_plugin_settings") @@ -190,7 +202,7 @@ def config(name, opts={}) name = name.to_s if name.is_a?(Symbol) @config[name] = opts # ok if this is empty - if name.is_a?(String) + if name.is_a?(String) && opts.fetch(:attr_accessor, true) define_method(name) { instance_variable_get("@#{name}") } define_method("#{name}=") { |v| instance_variable_set("@#{name}", v) } end @@ -429,6 +441,11 @@ def validate_value(value, validator) case validator when :codec if value.first.is_a?(String) + # A plugin's codecs should be instantiated by `PluginFactory` or in `Config::Mixin#config_init(Hash)`, + # which ensure the inner plugin has access to the outer's execution context and metric store. + # This deprecation exists to warn plugins that call `Config::Mixin::validate_value` directly. + self.deprecation_logger.deprecated("Codec instantiated by `Config::Mixin::DSL::validate_value(String, :codec)` which cannot propagate parent plugin's execution context or metrics. ", + self.logger.debug? ? {:backtrace => caller} : {}) value = LogStash::Codecs::Delegator.new LogStash::Plugin.lookup("codec", value.first).new return true, value else diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 89d5ad1c86d..72e1f6d9599 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -62,6 +62,7 @@ module Environment Setting::Boolean.new("pipeline.plugin_classloaders", false), Setting::Boolean.new("pipeline.separate_logs", false), Setting::CoercibleString.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]), + Setting::CoercibleString.new("pipeline.ecs_compatibility", "disabled", true, %w(disabled v1 v2)), Setting.new("path.plugins", Array, []), Setting::NullableString.new("interactive", nil, false), Setting::Boolean.new("config.debug", false), diff --git a/logstash-core/lib/logstash/inputs/base.rb b/logstash-core/lib/logstash/inputs/base.rb index a01b011e3f8..3ea40a339b9 100644 --- a/logstash-core/lib/logstash/inputs/base.rb +++ b/logstash-core/lib/logstash/inputs/base.rb @@ -124,10 +124,10 @@ def metric=(metric) def execution_context=(context) super - # There is no easy way to propage an instance variable into the codec, because the codec - # are created at the class level - # TODO(talevy): Codecs should have their own execution_context, for now they will inherit their - # parent plugin's + # Setting the execution context after initialization is deprecated and will be removed in + # a future release of Logstash. While this code is no longer executed from Logstash core, + # we continue to propagate a set execution context to an input's codec, and rely on super's + # deprecation warning. @codec.execution_context = context context end diff --git a/logstash-core/lib/logstash/outputs/base.rb b/logstash-core/lib/logstash/outputs/base.rb index c52dad18f9b..ed3a2827224 100644 --- a/logstash-core/lib/logstash/outputs/base.rb +++ b/logstash-core/lib/logstash/outputs/base.rb @@ -127,10 +127,10 @@ def metric=(metric) def execution_context=(context) super - # There is no easy way to propage an instance variable into the codec, because the codec - # are created at the class level - # TODO(talevy): Codecs should have their own execution_context, for now they will inherit their - # parent plugin's + # Setting the execution context after initialization is deprecated and will be removed in + # a future release of Logstash. While this code is no longer executed from Logstash core, + # we continue to propagate a set execution context to an output's codec, and rely on super's + # deprecation warning. @codec.execution_context = context context end diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 54089d5180a..6b3ec1733eb 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -89,8 +89,8 @@ def non_reloadable_plugins private - def plugin(plugin_type, name, source, *args) - @plugin_factory.plugin(plugin_type, name, source, *args) + def plugin(plugin_type, name, args, source) + @plugin_factory.plugin(plugin_type, name, args, source) end def default_logging_keys(other_keys = {}) diff --git a/logstash-core/lib/logstash/plugin.rb b/logstash-core/lib/logstash/plugin.rb index b376bd7d71f..a21e183929c 100644 --- a/logstash-core/lib/logstash/plugin.rb +++ b/logstash-core/lib/logstash/plugin.rb @@ -16,6 +16,7 @@ # under the License. require "logstash/config/mixin" +require "logstash/plugins/ecs_compatibility_support" require "concurrent" require "securerandom" @@ -24,11 +25,12 @@ class LogStash::Plugin include LogStash::Util::Loggable - attr_accessor :params, :execution_context + attr_accessor :params NL = "\n" include LogStash::Config::Mixin + include LogStash::Plugins::ECSCompatibilitySupport # Disable or enable metric logging for this specific plugin instance # by default we record all the metrics we can, but you can disable metrics collection @@ -60,7 +62,7 @@ def eql?(other) self.class.name == other.class.name && @params == other.params end - def initialize(params=nil) + def initialize(params={}) @logger = self.logger @deprecation_logger = self.deprecation_logger # need to access settings statically because plugins are initialized in config_ast with no context. @@ -177,4 +179,14 @@ def self.lookup(type, name) def plugin_metadata LogStash::PluginMetadata.for_plugin(self.id) end + + # Deprecated attr_writer for execution_context + def execution_context=(new_context) + @deprecation_logger.deprecated("LogStash::Plugin#execution_context=(new_ctx) is deprecated. Use LogStash::Plugins::Contextualizer#initialize_plugin(new_ctx, klass, args) instead", :caller => caller.first) + @execution_context = new_context + end + + def execution_context + @execution_context || LogStash::ExecutionContext::Empty + end end # class LogStash::Plugin diff --git a/logstash-core/lib/logstash/plugins/ecs_compatibility_support.rb b/logstash-core/lib/logstash/plugins/ecs_compatibility_support.rb new file mode 100644 index 00000000000..f87aff81c67 --- /dev/null +++ b/logstash-core/lib/logstash/plugins/ecs_compatibility_support.rb @@ -0,0 +1,53 @@ +module LogStash + module Plugins + module ECSCompatibilitySupport + def self.included(base) + base.extend(ArgumentValidator) + base.config(:ecs_compatibility, :validate => :ecs_compatibility_argument, + :attr_accessor => false) + end + + MUTEX = Mutex.new + private_constant :MUTEX + + def ecs_compatibility + @_ecs_compatibility || MUTEX.synchronize do + @_ecs_compatibility ||= begin + # use config_init-set value if present + break @ecs_compatibility unless @ecs_compatibility.nil? + + pipeline = execution_context.pipeline + pipeline_settings = pipeline && pipeline.settings + pipeline_settings ||= LogStash::SETTINGS + + if !pipeline_settings.set?('pipeline.ecs_compatibility') + deprecation_logger.deprecated("Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. " + + "To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.") + end + + pipeline_settings.get_value('pipeline.ecs_compatibility').to_sym + end + end + end + + module ArgumentValidator + V_PREFIXED_INTEGER_PATTERN = %r(\Av[1-9][0-9]?\Z).freeze + private_constant :V_PREFIXED_INTEGER_PATTERN + + def validate_value(value, validator) + return super unless validator == :ecs_compatibility_argument + + value = deep_replace(value) + value = hash_or_array(value) + + if value.size == 1 + return true, :disabled if value.first.to_s == 'disabled' + return true, value.first.to_sym if value.first.to_s =~ V_PREFIXED_INTEGER_PATTERN + end + + return false, "Expected a v-prefixed integer major-version number (e.g., `v1`) or the literal `disabled`, got #{value.inspect}" + end + end + end + end +end diff --git a/logstash-core/lib/logstash/runner.rb b/logstash-core/lib/logstash/runner.rb index 0b5304e5350..9a0cf1bcf56 100644 --- a/logstash-core/lib/logstash/runner.rb +++ b/logstash-core/lib/logstash/runner.rb @@ -154,6 +154,11 @@ class LogStash::Runner < Clamp::StrictCommand :attribute_name => "pipeline.unsafe_shutdown", :default => LogStash::SETTINGS.get_default("pipeline.unsafe_shutdown") + option ["--pipeline.ecs_compatibility"], "STRING", + I18n.t("logstash.runner.flag.ecs_compatibility"), + :attribute_name => "pipeline.ecs_compatibility", + :default => LogStash::SETTINGS.get_default('pipeline.ecs_compatibility') + # Data Path Setting option ["--path.data"] , "PATH", I18n.t("logstash.runner.flag.datapath"), diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb index 5f648c1e1dd..96270911c1e 100644 --- a/logstash-core/lib/logstash/settings.rb +++ b/logstash-core/lib/logstash/settings.rb @@ -48,6 +48,7 @@ class Settings "pipeline.system", "pipeline.workers", "pipeline.ordered", + "pipeline.ecs_compatibility", "queue.checkpoint.acks", "queue.checkpoint.interval", "queue.checkpoint.writes", diff --git a/logstash-core/locales/en.yml b/logstash-core/locales/en.yml index 25a8304e2b7..8890d9ecd0f 100644 --- a/logstash-core/locales/en.yml +++ b/logstash-core/locales/en.yml @@ -362,6 +362,21 @@ en: if there are still inflight events in memory. By default, logstash will refuse to quit until all received events have been pushed to the outputs. + ecs_compatibility: |+ + Sets the pipeline's default value for `ecs_compatibility`, + a setting that is available to plugins that implement + an ECS Compatibility mode for use with the Elastic Common + Schema. + Possible values are: + - disabled (default) + - v1 + - v2 + This option allows the early opt-in (or preemptive opt-out) + of ECS Compatibility modes in plugins, which is scheduled to + be on-by-default in a future major release of Logstash. + + Values other than `disabled` are currently considered BETA, + and may produce unintended consequences when upgrading Logstash. rubyshell: |+ Drop to shell instead of running as normal. Valid shells are "irb" and "pry" diff --git a/logstash-core/spec/logstash/config/mixin_spec.rb b/logstash-core/spec/logstash/config/mixin_spec.rb index 3fa8f09659e..60c2611363e 100644 --- a/logstash-core/spec/logstash/config/mixin_spec.rb +++ b/logstash-core/spec/logstash/config/mixin_spec.rb @@ -47,6 +47,31 @@ end end + context 'DSL::validate_value(String, :codec)' do + subject(:plugin_class) { Class.new(LogStash::Filters::Base) { config_name "test_deprecated_two" } } + let(:codec_class) { Class.new(LogStash::Codecs::Base) { config_name 'dummy' } } + let(:deprecation_logger) { double("DeprecationLogger").as_null_object } + + before(:each) do + allow(plugin_class).to receive(:deprecation_logger).and_return(deprecation_logger) + allow(LogStash::Plugin).to receive(:lookup).with("codec", codec_class.config_name).and_return(codec_class) + end + + it 'instantiates the codec' do + success, codec = plugin_class.validate_value(codec_class.config_name, :codec) + + expect(success).to be true + expect(codec.class).to eq(codec_class) + end + + it 'logs a deprecation' do + plugin_class.validate_value(codec_class.config_name, :codec) + expect(deprecation_logger).to have_received(:deprecated) do |message| + expect(message).to include("validate_value(String, :codec)") + end + end + end + context "when validating :bytes successfully" do subject do local_num_bytes = num_bytes # needs to be locally scoped :( diff --git a/logstash-core/spec/logstash/execution_context_factory_spec.rb b/logstash-core/spec/logstash/execution_context_factory_spec.rb new file mode 100644 index 00000000000..e20fd6cb68b --- /dev/null +++ b/logstash-core/spec/logstash/execution_context_factory_spec.rb @@ -0,0 +1,59 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "spec_helper" + +describe LogStash::Plugins::ExecutionContextFactory do + let(:pipeline) { double('Pipeline') } + let(:agent) { double('Agent') } + let(:inner_dlq_writer) { nil } + + subject(:factory) { described_class.new(agent, pipeline, inner_dlq_writer) } + + context '#create' do + let(:plugin_id) { SecureRandom.uuid } + let(:plugin_type) { 'input' } + + context 'the resulting instance' do + subject(:instance) { factory.create(plugin_id, plugin_type) } + + it 'retains the pipeline from the factory' do + expect(instance.pipeline).to be(pipeline) + end + + it 'retains the agent from the factory' do + expect(instance.agent).to be(agent) + end + + it 'has a dlq_writer' do + expect(instance.dlq_writer).to_not be_nil + end + + context 'dlq_writer' do + subject(:instance_dlq_writer) { instance.dlq_writer } + + it 'retains the plugin id' do + expect(instance_dlq_writer.plugin_id).to eq(plugin_id) + end + + it 'retains the plugin type' do + expect(instance_dlq_writer.plugin_type).to eq(plugin_type) + end + end + end + end +end diff --git a/logstash-core/spec/logstash/execution_context_spec.rb b/logstash-core/spec/logstash/execution_context_spec.rb index d1e529cbf92..4ec9acb21a3 100644 --- a/logstash-core/spec/logstash/execution_context_spec.rb +++ b/logstash-core/spec/logstash/execution_context_spec.rb @@ -30,7 +30,7 @@ allow(pipeline).to receive(:pipeline_id).and_return(pipeline_id) end - subject { described_class.new(pipeline, agent, plugin_id, plugin_type, dlq_writer) } + subject { described_class.new(pipeline, agent, dlq_writer) } it "returns the `pipeline_id`" do expect(subject.pipeline_id).to eq(pipeline_id) @@ -44,9 +44,7 @@ expect(subject.agent).to eq(agent) end - it "returns the plugin-specific dlq writer" do - expect(subject.dlq_writer.plugin_type).to eq(plugin_type) - expect(subject.dlq_writer.plugin_id).to eq(plugin_id) - expect(subject.dlq_writer.inner_writer).to eq(dlq_writer) + it "returns the dlq writer" do + expect(subject.dlq_writer).to be(dlq_writer) end end diff --git a/logstash-core/spec/logstash/inputs/base_spec.rb b/logstash-core/spec/logstash/inputs/base_spec.rb index 9540bbe384e..ecb34c0d164 100644 --- a/logstash-core/spec/logstash/inputs/base_spec.rb +++ b/logstash-core/spec/logstash/inputs/base_spec.rb @@ -86,18 +86,34 @@ def register; end subject(:instance) { klass.new({}) } - it "allow to set the context" do - expect(instance.execution_context).to be_nil - instance.execution_context = execution_context - - expect(instance.execution_context).to eq(execution_context) - end - - it "propagate the context to the codec" do - expect(instance.codec.execution_context).to be_nil - instance.execution_context = execution_context - - expect(instance.codec.execution_context).to eq(execution_context) + context 'execution_context=' do + let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object } + before(:each) do + allow(klass).to receive(:deprecation_logger).and_return(deprecation_logger_stub) + end + + it "allow to set the context" do + new_ctx = execution_context.dup + subject.execution_context = new_ctx + expect(subject.execution_context).to be(new_ctx) + end + + it "propagate the context to the codec" do + new_ctx = execution_context.dup + expect(instance.codec.execution_context).to_not be(new_ctx) + instance.execution_context = new_ctx + + expect(instance.execution_context).to be(new_ctx) + expect(instance.codec.execution_context).to be(new_ctx) + end + + it 'emits a deprecation warning' do + expect(deprecation_logger_stub).to receive(:deprecated) do |message| + expect(message).to match(/execution_context=/) + end + + instance.execution_context = execution_context + end end end diff --git a/logstash-core/spec/logstash/outputs/base_spec.rb b/logstash-core/spec/logstash/outputs/base_spec.rb index 191d8275178..0e7df6d521f 100644 --- a/logstash-core/spec/logstash/outputs/base_spec.rb +++ b/logstash-core/spec/logstash/outputs/base_spec.rb @@ -102,18 +102,34 @@ def multi_receive_encoded(events_and_encoded) subject(:instance) { klass.new(params.dup) } - it "allow to set the context" do - expect(instance.execution_context).to be_nil - instance.execution_context = execution_context + context 'execution_context=' do + let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object } + before(:each) do + allow(klass).to receive(:deprecation_logger).and_return(deprecation_logger_stub) + end - expect(instance.execution_context).to eq(execution_context) - end + it "allow to set the context" do + new_ctx = execution_context.dup + subject.execution_context = new_ctx + expect(subject.execution_context).to be(new_ctx) + end - it "propagate the context to the codec" do - expect(instance.codec.execution_context).to be_nil - instance.execution_context = execution_context + it "propagate the context to the codec" do + new_ctx = execution_context.dup + expect(instance.codec.execution_context).to_not be(new_ctx) + instance.execution_context = new_ctx - expect(instance.codec.execution_context).to eq(execution_context) + expect(instance.execution_context).to be(new_ctx) + expect(instance.codec.execution_context).to be(new_ctx) + end + + it 'emits a deprecation warning' do + expect(deprecation_logger_stub).to receive(:deprecated) do |message| + expect(message).to match(/execution_context=/) + end + + instance.execution_context = execution_context + end end end diff --git a/logstash-core/spec/logstash/plugin_spec.rb b/logstash-core/spec/logstash/plugin_spec.rb index e56299dd582..e286e2950b6 100644 --- a/logstash-core/spec/logstash/plugin_spec.rb +++ b/logstash-core/spec/logstash/plugin_spec.rb @@ -69,13 +69,28 @@ def self.reloadable? end context "#execution_context" do - subject { Class.new(LogStash::Plugin).new({}) } + let(:klass) { Class.new(LogStash::Plugin) } + subject(:instance) { klass.new({}) } include_context "execution_context" - it "can be set and get" do - expect(subject.execution_context).to be_nil - subject.execution_context = execution_context - expect(subject.execution_context).to eq(execution_context) + context 'execution_context=' do + let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object } + before(:each) do + allow(klass).to receive(:deprecation_logger).and_return(deprecation_logger_stub) + end + + it "can be set and get" do + new_ctx = execution_context.dup + subject.execution_context = new_ctx + expect(subject.execution_context).to eq(new_ctx) + end + + it 'emits a deprecation warning' do + expect(deprecation_logger_stub).to receive(:deprecated) do |message| + expect(message).to match(/execution_context=/) + end + instance.execution_context = execution_context + end end end @@ -402,6 +417,66 @@ def register; end end end + describe "#ecs_compatibility" do + let(:plugin_class) do + Class.new(LogStash::Filters::Base) do + config_name "ecs_validator_sample" + def register; end + end + end + let(:config) { Hash.new } + let(:instance) { plugin_class.new(config) } + + let(:deprecation_logger_stub) { double('DeprecationLogger').as_null_object } + before(:each) do + allow(plugin_class).to receive(:deprecation_logger).and_return(deprecation_logger_stub) + end + + context 'when plugin initialized with explicit value' do + let(:config) { super().merge("ecs_compatibility" => "v17") } + it 'returns the explicitly-given value' do + expect(instance.ecs_compatibility).to eq(:v17) + end + end + + context 'when plugin is not initialized with an explicit value' do + let(:settings_stub) { LogStash::SETTINGS.clone } + + before(:each) do + allow(settings_stub).to receive(:get_value).with(anything).and_call_original # allow spies + stub_const('LogStash::SETTINGS', settings_stub) + end + + context 'and pipeline-level setting is explicitly `v1`' do + let(:settings_stub) do + super().tap do |settings| + settings.set_value('pipeline.ecs_compatibility', 'v1') + end + end + it 'reads the setting' do + expect(instance.ecs_compatibility).to eq(:v1) + + expect(settings_stub).to have_received(:get_value) + end + end + + context 'and pipeline-level setting is not specified' do + it 'emits a deprecation warning about using the default which may change' do + instance.ecs_compatibility + + expect(deprecation_logger_stub).to have_received(:deprecated) do |message| + expect(message).to include("Relying on default value of `pipeline.ecs_compatibility`") + end + end + it 'returns `disabled`' do + # Default value of `pipeline.ecs_compatibility` + expect(instance.ecs_compatibility).to eq(:disabled) + end + end + end + + end + describe "deprecation logger" do let(:config) do { diff --git a/logstash-core/spec/support/shared_contexts.rb b/logstash-core/spec/support/shared_contexts.rb index 0dce1799339..042869df574 100644 --- a/logstash-core/spec/support/shared_contexts.rb +++ b/logstash-core/spec/support/shared_contexts.rb @@ -22,8 +22,9 @@ let(:plugin_id) { :plugin_id } let(:plugin_type) { :plugin_type } let(:dlq_writer) { double("dlq_writer") } + let(:execution_context_factory) { ::LogStash::Plugins::ExecutionContextFactory.new(agent, pipeline, dlq_writer) } let(:execution_context) do - ::LogStash::ExecutionContext.new(pipeline, agent, plugin_id, plugin_type, dlq_writer) + execution_context_factory.create(plugin_id, plugin_type) end before do diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index d802679235b..abf86480298 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -26,6 +26,7 @@ import org.jruby.anno.JRubyClass; import org.jruby.exceptions.RaiseException; import org.jruby.javasupport.JavaUtil; +import org.jruby.runtime.Block; import org.jruby.runtime.ObjectAllocator; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.ackedqueue.QueueFactoryExt; @@ -74,6 +75,7 @@ import org.logstash.log.SlowLoggerExt; import org.logstash.plugins.HooksRegistryExt; import org.logstash.plugins.UniversalPluginExt; +import org.logstash.plugins.factory.ContextualizerExt; import org.logstash.util.UtilExt; import org.logstash.plugins.factory.ExecutionContextFactoryExt; import org.logstash.plugins.factory.PluginMetricsFactoryExt; @@ -199,6 +201,8 @@ public final class RubyUtil { public static final RubyClass PLUGIN_FACTORY_CLASS; + public static final RubyModule PLUGIN_CONTEXTUALIZER_MODULE; + public static final RubyClass LOGGER; public static final RubyModule LOGGABLE_MODULE; @@ -333,7 +337,7 @@ public final class RubyUtil { UTIL_MODULE = LOGSTASH_MODULE.defineModuleUnder("Util"); UTIL_MODULE.defineAnnotatedMethods(UtilExt.class); ABSTRACT_DLQ_WRITER_CLASS = UTIL_MODULE.defineClassUnder( - "AbstractDeadLetterQueueWriterExt", RUBY.getObject(), + "AbstractDeadLetterQueueWriter", RUBY.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR ); ABSTRACT_DLQ_WRITER_CLASS.defineAnnotatedMethods(AbstractDeadLetterQueueWriterExt.class); @@ -401,6 +405,7 @@ public final class RubyUtil { EXECUTION_CONTEXT_CLASS = setupLogstashClass( ExecutionContextExt::new, ExecutionContextExt.class ); + EXECUTION_CONTEXT_CLASS.defineConstant("Empty", EXECUTION_CONTEXT_CLASS.newInstance(RUBY.getCurrentContext(), RUBY.getNil(), RUBY.getNil(), RUBY.getNil(), Block.NULL_BLOCK)); RUBY_TIMESTAMP_CLASS = setupLogstashClass( JrubyTimestampExtLibrary.RubyTimestamp::new, JrubyTimestampExtLibrary.RubyTimestamp.class ); @@ -551,6 +556,8 @@ public final class RubyUtil { "PluginFactory", RUBY.getObject(), PluginFactoryExt::new ); PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.class); + PLUGIN_CONTEXTUALIZER_MODULE = PLUGINS_MODULE.defineOrGetModuleUnder("Contextualizer"); + PLUGIN_CONTEXTUALIZER_MODULE.defineAnnotatedMethods(ContextualizerExt.class); UNIVERSAL_PLUGIN_CLASS = setupLogstashClass(UniversalPluginExt::new, UniversalPluginExt.class); EVENT_DISPATCHER_CLASS = @@ -636,4 +643,18 @@ public static IRubyObject toRubyObject(Object javaObject) { return JavaUtil.convertJavaToRuby(RUBY, javaObject); } + /** + * Cast an IRubyObject that may be nil to a specific class + * @param objectOrNil an object of either type {@code } or nil. + * @param the type to cast non-nil values to + * @return The given value, cast to {@code }, or null. + */ + public static T nilSafeCast(final IRubyObject objectOrNil) { + if (objectOrNil == null || objectOrNil.isNil()) { return null; } + + @SuppressWarnings("unchecked") + final T objectAsCasted = (T) objectOrNil; + + return objectAsCasted; + } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index 092e1c0e578..026ef64c999 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -164,8 +164,9 @@ private Map setupOutputs(ConfigVariableExpan outs.forEach(v -> { final PluginDefinition def = v.getPluginDefinition(); final SourceWithMetadata source = v.getSourceWithMetadata(); + final Map args = expandArguments(def, cve); res.put(v.getId(), pluginFactory.buildOutput( - RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve) + RubyUtil.RUBY.newString(def.getName()), convertArgs(args), source )); }); return res; @@ -181,8 +182,9 @@ private Map setupFilters(ConfigVariableExpan for (final PluginVertex vertex : filterPlugins) { final PluginDefinition def = vertex.getPluginDefinition(); final SourceWithMetadata source = vertex.getSourceWithMetadata(); + final Map args = expandArguments(def, cve); res.put(vertex.getId(), pluginFactory.buildFilter( - RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve) + RubyUtil.RUBY.newString(def.getName()), convertArgs(args), source )); } return res; @@ -197,71 +199,47 @@ private Collection setupInputs(ConfigVariableExpander cve) { vertices.forEach(v -> { final PluginDefinition def = v.getPluginDefinition(); final SourceWithMetadata source = v.getSourceWithMetadata(); + final Map args = expandArguments(def, cve); IRubyObject o = pluginFactory.buildInput( - RubyUtil.RUBY.newString(def.getName()), source, convertArgs(def), convertJavaArgs(def, cve)); + RubyUtil.RUBY.newString(def.getName()), convertArgs(args), source); nodes.add(o); }); return nodes; } - /** - * Converts plugin arguments from the format provided by {@link PipelineIR} into coercible - * Ruby types. - * @param def PluginDefinition as provided by {@link PipelineIR} - * @return RubyHash of plugin arguments as understood by {@link RubyIntegration.PluginFactory} - * methods - */ - private RubyHash convertArgs(final PluginDefinition def) { + + final RubyHash convertArgs(final Map input) { final RubyHash converted = RubyHash.newHash(RubyUtil.RUBY); - for (final Map.Entry entry : def.getArguments().entrySet()) { + for (final Map.Entry entry : input.entrySet()) { final Object value = entry.getValue(); final String key = entry.getKey(); - final Object toput; - if (value instanceof PluginStatement) { - final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition(); - SourceWithMetadata source = ((PluginStatement) value).getSourceWithMetadata(); - toput = pluginFactory.buildCodec( - RubyUtil.RUBY.newString(codec.getName()), - source, - Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), - codec.getArguments() - ); - } else { - toput = value; - } - converted.put(key, toput); + converted.put(key, value); } + return converted; } - /** - * Converts plugin arguments from the format provided by {@link PipelineIR} into coercible - * Java types for consumption by Java plugins. - * @param def PluginDefinition as provided by {@link PipelineIR} - * @return Map of plugin arguments as understood by the {@link RubyIntegration.PluginFactory} - * methods that create Java plugins - */ - private Map convertJavaArgs(final PluginDefinition def, ConfigVariableExpander cve) { - Map args = expandConfigVariables(cve, def.getArguments()); - for (final Map.Entry entry : args.entrySet()) { - final Object value = entry.getValue(); + + private Map expandArguments(final PluginDefinition pluginDefinition, final ConfigVariableExpander cve) { + Map arguments = expandConfigVariables(cve, pluginDefinition.getArguments()); + + // Intercept codec definitions from LIR + for (final Map.Entry entry : arguments.entrySet()) { final String key = entry.getKey(); - final IRubyObject toput; + final Object value = entry.getValue(); if (value instanceof PluginStatement) { - final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition(); - SourceWithMetadata source = ((PluginStatement) value).getSourceWithMetadata(); - Map codecArgs = expandConfigVariables(cve, codec.getArguments()); - toput = pluginFactory.buildCodec( - RubyUtil.RUBY.newString(codec.getName()), - source, - Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), - codecArgs - ); - Codec javaCodec = (Codec)JavaUtil.unwrapJavaValue(toput); - args.put(key, javaCodec); + final PluginStatement codecPluginStatement = (PluginStatement) value; + final PluginDefinition codecDefinition = codecPluginStatement.getPluginDefinition(); + final SourceWithMetadata codecSource = codecPluginStatement.getSourceWithMetadata(); + final Map codecArguments = expandArguments(codecDefinition, cve); + IRubyObject codecInstance = pluginFactory.buildCodec(RubyUtil.RUBY.newString(codecDefinition.getName()), + Rubyfier.deep(RubyUtil.RUBY, codecArguments), + codecSource); + arguments.put(key, codecInstance); } } - return args; + + return arguments; } @SuppressWarnings({"rawtypes", "unchecked"}) diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java index c5afaac0592..b50f7665240 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java @@ -32,9 +32,14 @@ import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.internal.runtime.methods.DynamicMethod; +import org.jruby.runtime.Block; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; +import org.logstash.execution.ExecutionContextExt; +import org.logstash.plugins.factory.ContextualizerExt; + +import static org.logstash.RubyUtil.PLUGIN_CONTEXTUALIZER_MODULE; public final class OutputStrategyExt { @@ -176,6 +181,9 @@ public LegacyOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) { @JRubyMethod(required = 4) public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) { + final RubyClass outputClass = (RubyClass) args[0]; + final IRubyObject metric = args[1]; + final ExecutionContextExt executionContext = (ExecutionContextExt) args[2]; final RubyHash pluginArgs = (RubyHash) args[3]; workerCount = pluginArgs.op_aref(context, context.runtime.newString("workers")); if (workerCount.isNil()) { @@ -185,12 +193,9 @@ public IRubyObject initialize(final ThreadContext context, final IRubyObject[] a workerQueue = new ArrayBlockingQueue<>(count); workers = context.runtime.newArray(count); for (int i = 0; i < count; ++i) { - final RubyClass outputClass = (RubyClass) args[0]; - // Calling "new" here manually to allow mocking the ctor in RSpec Tests - final IRubyObject output = outputClass.callMethod(context, "new", pluginArgs); + final IRubyObject output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs); initOutputCallsite(outputClass); - output.callMethod(context, "metric=", args[1]); - output.callMethod(context, "execution_context=", args[2]); + output.callMethod(context, "metric=", metric); workers.append(output); workerQueue.add(output); } @@ -248,11 +253,15 @@ protected SimpleAbstractOutputStrategyExt(final Ruby runtime, final RubyClass me @JRubyMethod(required = 4) public IRubyObject initialize(final ThreadContext context, final IRubyObject[] args) { final RubyClass outputClass = (RubyClass) args[0]; + final IRubyObject metric = args[1]; + final ExecutionContextExt executionContext = (ExecutionContextExt) args[2]; + final RubyHash pluginArgs = (RubyHash) args[3]; + // TODO: fixup mocks // Calling "new" here manually to allow mocking the ctor in RSpec Tests - output = args[0].callMethod(context, "new", args[3]); + output = ContextualizerExt.initializePlugin(context, executionContext, outputClass, pluginArgs); + initOutputCallsite(outputClass); - output.callMethod(context, "metric=", args[1]); - output.callMethod(context, "execution_context=", args[2]); + output.callMethod(context, "metric=", metric); return this; } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java deleted file mode 100644 index e476ad4368b..00000000000 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/PluginFactory.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.logstash.config.ir.compiler; - -import co.elastic.logstash.api.Codec; -import org.jruby.RubyString; -import org.jruby.runtime.builtin.IRubyObject; -import co.elastic.logstash.api.Configuration; -import co.elastic.logstash.api.Context; -import co.elastic.logstash.api.Filter; -import co.elastic.logstash.api.Input; -import org.logstash.common.SourceWithMetadata; - -import java.util.Map; - -/** - * Factory that can instantiate Java plugins as well as Ruby plugins. - */ -public interface PluginFactory extends RubyIntegration.PluginFactory { - - Input buildInput(String name, String id, Configuration configuration, Context context); - - Filter buildFilter(String name, String id, Configuration configuration, Context context); - - final class Default implements PluginFactory { - - private final RubyIntegration.PluginFactory rubyFactory; - - public Default(final RubyIntegration.PluginFactory rubyFactory) { - this.rubyFactory = rubyFactory; - } - - @Override - public Input buildInput(final String name, final String id, final Configuration configuration, final Context context) { - return null; - } - - @Override - public Filter buildFilter(final String name, final String id, final Configuration configuration, final Context context) { - return null; - } - - @Override - public IRubyObject buildInput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { - return rubyFactory.buildInput(name, source, args, pluginArgs); - } - - @Override - public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, final Map pluginArgs) { - return rubyFactory.buildOutput(name, source, args, pluginArgs); - } - - @Override - public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source, - final IRubyObject args, final Map pluginArgs) { - return rubyFactory.buildFilter(name, source, args, pluginArgs); - } - - @Override - public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args, - Map pluginArgs) { - return rubyFactory.buildCodec(name, source, args, pluginArgs); - } - - @Override - public Codec buildDefaultCodec(final String codecName) { - return null; - } - } -} diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java index faac9f66d38..bc66371c67a 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java @@ -41,17 +41,13 @@ private RubyIntegration() { */ public interface PluginFactory { - IRubyObject buildInput(RubyString name, SourceWithMetadata source, - IRubyObject args, Map pluginArgs); + IRubyObject buildInput(RubyString name, IRubyObject args, SourceWithMetadata source); - AbstractOutputDelegatorExt buildOutput(RubyString name, SourceWithMetadata source, - IRubyObject args, Map pluginArgs); + AbstractOutputDelegatorExt buildOutput(RubyString name, IRubyObject args, SourceWithMetadata source); - AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadata source, IRubyObject args, - Map pluginArgs); + AbstractFilterDelegatorExt buildFilter(RubyString name, IRubyObject args, SourceWithMetadata source); - IRubyObject buildCodec(RubyString name, SourceWithMetadata source, IRubyObject args, - Map pluginArgs); + IRubyObject buildCodec(RubyString name, IRubyObject args, SourceWithMetadata source); Codec buildDefaultCodec(String codecName); diff --git a/logstash-core/src/main/java/org/logstash/execution/Engine.java b/logstash-core/src/main/java/org/logstash/execution/Engine.java new file mode 100644 index 00000000000..112b6bf2d84 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/Engine.java @@ -0,0 +1,7 @@ +package org.logstash.execution; + +public enum Engine { + RUBY, + JAVA, + ; +} diff --git a/logstash-core/src/main/java/org/logstash/execution/ExecutionContextExt.java b/logstash-core/src/main/java/org/logstash/execution/ExecutionContextExt.java index e99cd52aa98..f7815c746cc 100644 --- a/logstash-core/src/main/java/org/logstash/execution/ExecutionContextExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/ExecutionContextExt.java @@ -25,6 +25,7 @@ import org.jruby.RubyObject; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.Block; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; @@ -45,14 +46,16 @@ public ExecutionContextExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); } - @JRubyMethod(required = 5) + @JRubyMethod(required = 2, optional = 1) public ExecutionContextExt initialize(final ThreadContext context, final IRubyObject[] args) { pipeline = args[0]; agent = args[1]; - dlqWriter = new AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt( - context.runtime, RubyUtil.PLUGIN_DLQ_WRITER_CLASS - ).initialize(context, args[4], args[2], args[3]); + if (args.length > 2 && !args[2].isNil()) { + dlqWriter = (AbstractDeadLetterQueueWriterExt) args[2]; + } else { + dlqWriter = (AbstractDeadLetterQueueWriterExt) RubyUtil.DUMMY_DLQ_WRITER_CLASS.newInstance(context, Block.NULL_BLOCK); + } return this; } @@ -73,6 +76,9 @@ public IRubyObject pipeline(final ThreadContext context) { @JRubyMethod(name = "pipeline_id") public IRubyObject pipelineId(final ThreadContext context) { + if (pipeline.isNil()) { + return context.nil; + } return pipeline.callMethod(context, "pipeline_id"); } } diff --git a/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java index aea003dde43..7d58f2c1a15 100644 --- a/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java @@ -77,7 +77,8 @@ public JavaBasePipelineExt initialize(final ThreadContext context, final IRubyOb new ExecutionContextFactoryExt( context.runtime, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS ).initialize(context, args[3], this, dlqWriter(context)), - RubyUtil.FILTER_DELEGATOR_CLASS + RubyUtil.FILTER_DELEGATOR_CLASS, + Engine.JAVA ), getSecretStore(context) ); diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/ContextualizerExt.java b/logstash-core/src/main/java/org/logstash/plugins/factory/ContextualizerExt.java new file mode 100644 index 00000000000..becba99cb85 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/ContextualizerExt.java @@ -0,0 +1,99 @@ +package org.logstash.plugins.factory; + +import org.jruby.RubyClass; +import org.jruby.RubyHash; +import org.jruby.RubyModule; +import org.jruby.anno.JRubyMethod; +import org.jruby.anno.JRubyModule; +import org.jruby.runtime.Block; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; +import org.logstash.execution.ExecutionContextExt; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.jruby.runtime.Helpers.invokeSuper; +import static org.logstash.RubyUtil.PLUGIN_CONTEXTUALIZER_MODULE; + +/** + * The {@link ContextualizerExt} is used to inject {@link org.logstash.execution.ExecutionContextExt } to plugins + * before they are initialized. + * + * @see ContextualizerExt#initializePlugin(ThreadContext, IRubyObject, IRubyObject[], Block) + */ +@JRubyModule(name = "Contextualizer") +public class ContextualizerExt { + + private static final String EXECUTION_CONTEXT_IVAR_NAME = "@execution_context"; + + /* + * @overload PluginContextualizer::initialize_plugin(execution_context, plugin_class, *plugin_args, &pluginBlock) + */ + @JRubyMethod(name = "initialize_plugin", meta = true, required = 2, rest = true) + public static IRubyObject initializePlugin(final ThreadContext context, + final IRubyObject recv, + final IRubyObject[] args, + final Block block) { + final List argsList = new ArrayList<>(Arrays.asList(args)); + final ExecutionContextExt executionContext = RubyUtil.nilSafeCast(argsList.remove(0)); + final RubyClass pluginClass = (RubyClass) argsList.remove(0); + final IRubyObject[] pluginArgs = argsList.toArray(new IRubyObject[]{}); + + return initializePlugin(context, (RubyModule) recv, executionContext, pluginClass, pluginArgs, block); + } + + public static IRubyObject initializePlugin(final ThreadContext context, + @Nullable final ExecutionContextExt executionContextExt, + final RubyClass pluginClass, + final RubyHash pluginArgs) { + return initializePlugin(context, PLUGIN_CONTEXTUALIZER_MODULE, executionContextExt, pluginClass, new IRubyObject[]{pluginArgs}, Block.NULL_BLOCK); + } + + private static IRubyObject initializePlugin(final ThreadContext context, + final RubyModule recv, + @Nullable final ExecutionContextExt executionContext, + final RubyClass pluginClass, + final IRubyObject[] pluginArgs, + final Block block) { + synchronized (ContextualizerExt.class) { + if (!pluginClass.hasModuleInPrepends(recv)) { + pluginClass.prepend(context, new IRubyObject[]{recv}); + } + } + + final IRubyObject[] pluginInitArgs; + if (executionContext == null) { + pluginInitArgs = pluginArgs; + } else { + List pluginInitArgList = new ArrayList<>(1 + pluginArgs.length); + pluginInitArgList.add(executionContext); + pluginInitArgList.addAll(Arrays.asList(pluginArgs)); + pluginInitArgs = pluginInitArgList.toArray(new IRubyObject[]{}); + } + + // We must use IRubyObject#callMethod(...,"new",...) here to continue supporting + // mocking/validating from rspec. + return pluginClass.callMethod(context, "new", pluginInitArgs, block); + } + + @JRubyMethod(name = "initialize", rest = true, frame = true) // framed for invokeSuper + public static IRubyObject initialize(final ThreadContext context, + final IRubyObject recv, + final IRubyObject[] args, + final Block block) { + final List argsList = new ArrayList<>(Arrays.asList(args)); + + if (args.length > 0 && args[0] instanceof ExecutionContextExt) { + final ExecutionContextExt executionContext = (ExecutionContextExt) argsList.remove(0); + recv.getInstanceVariables().setInstanceVariable(EXECUTION_CONTEXT_IVAR_NAME, executionContext); + } + + final IRubyObject[] restArgs = argsList.toArray(new IRubyObject[]{}); + + return invokeSuper(context, recv, restArgs, block); + } +} diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java index d72ef4a5cd9..2799eb12ce4 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java @@ -46,10 +46,14 @@ public ExecutionContextFactoryExt initialize(final ThreadContext context, final @JRubyMethod public ExecutionContextExt create(final ThreadContext context, final IRubyObject id, final IRubyObject classConfigName) { + final AbstractDeadLetterQueueWriterExt dlqWriterForInstance = new AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt( + context.runtime, RubyUtil.PLUGIN_DLQ_WRITER_CLASS + ).initialize(context, dlqWriter, id, classConfigName); + return new ExecutionContextExt( context.runtime, RubyUtil.EXECUTION_CONTEXT_CLASS ).initialize( - context, new IRubyObject[]{pipeline, agent, id, classConfigName, dlqWriter} + context, new IRubyObject[]{pipeline, agent, dlqWriterForInstance} ); } diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java index a226e57d56d..0d3226e0755 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java @@ -5,6 +5,7 @@ import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.javasupport.JavaUtil; +import org.jruby.runtime.Block; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; @@ -13,6 +14,7 @@ import org.logstash.config.ir.PipelineIR; import org.logstash.config.ir.compiler.*; import org.logstash.config.ir.graph.Vertex; +import org.logstash.execution.Engine; import org.logstash.execution.ExecutionContextExt; import org.logstash.instrument.metrics.AbstractMetricExt; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; @@ -21,6 +23,7 @@ import org.logstash.plugins.PluginLookup; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; @JRubyClass(name = "PluginFactory") public final class PluginFactoryExt extends RubyBasicObject @@ -35,7 +38,9 @@ public interface PluginResolver { private static final RubyString ID_KEY = RubyUtil.RUBY.newString("id"); - private final Collection pluginsById = new HashSet<>(); + private final Collection pluginsById = ConcurrentHashMap.newKeySet(); + + private Engine engine; private PipelineIR lir; @@ -43,7 +48,7 @@ public interface PluginResolver { private PluginMetricsFactoryExt metrics; - private RubyClass filterClass; + private RubyClass filterDelegatorClass; private ConfigVariableExpander configVariables; @@ -54,16 +59,22 @@ public interface PluginResolver { @JRubyMethod(name = "filter_delegator", meta = true, required = 5) public static IRubyObject filterDelegator(final ThreadContext context, final IRubyObject recv, final IRubyObject... args) { + // filterDelegatorClass, klass, rubyArgs, typeScopedMetric, executionCntx + final RubyClass filterDelegatorClass = (RubyClass) args[0]; + final RubyClass klass = (RubyClass) args[1]; final RubyHash arguments = (RubyHash) args[2]; - final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments); + final AbstractMetricExt typeScopedMetric = (AbstractMetricExt) args[3]; + final ExecutionContextExt executionContext = (ExecutionContextExt) args[4]; + + final IRubyObject filterInstance = ContextualizerExt.initializePlugin(context, executionContext, klass, arguments); + final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY); filterInstance.callMethod( context, "metric=", - ((AbstractMetricExt) args[3]).namespace(context, id.intern()) + typeScopedMetric.namespace(context, id.intern()) ); - filterInstance.callMethod(context, "execution_context=", args[4]); - return new FilterDelegatorExt(context.runtime, RubyUtil.FILTER_DELEGATOR_CLASS) - .initialize(context, filterInstance, id); + + return filterDelegatorClass.newInstance(context, filterInstance, id, Block.NULL_BLOCK); } public PluginFactoryExt(final Ruby runtime, final RubyClass metaClass) { @@ -80,25 +91,33 @@ public PluginFactoryExt initialize(final ThreadContext context, final IRubyObject[] args) { return init( args[0].toJava(PipelineIR.class), - (PluginMetricsFactoryExt) args[1], (ExecutionContextFactoryExt) args[2], - (RubyClass) args[3] + (PluginMetricsFactoryExt) args[1], + (ExecutionContextFactoryExt) args[2], + (RubyClass) args[3], + EnvironmentVariableProvider.defaultProvider(), + Engine.RUBY ); } - public PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metrics, - final ExecutionContextFactoryExt executionContextFactoryExt, - final RubyClass filterClass) { - return this.init(lir, metrics, executionContextFactoryExt, filterClass, EnvironmentVariableProvider.defaultProvider()); + public PluginFactoryExt init(final PipelineIR lir, + final PluginMetricsFactoryExt metrics, + final ExecutionContextFactoryExt executionContextFactoryExt, + final RubyClass filterClass, + final Engine engine) { + return this.init(lir, metrics, executionContextFactoryExt, filterClass, EnvironmentVariableProvider.defaultProvider(), engine); } - PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metrics, + PluginFactoryExt init(final PipelineIR lir, + final PluginMetricsFactoryExt metrics, final ExecutionContextFactoryExt executionContextFactoryExt, final RubyClass filterClass, - final EnvironmentVariableProvider envVars) { + final EnvironmentVariableProvider envVars, + final Engine engine) { this.lir = lir; this.metrics = metrics; this.executionContextFactory = executionContextFactoryExt; - this.filterClass = filterClass; + this.filterDelegatorClass = filterClass; + this.engine = engine; this.pluginCreatorsRegistry.put(PluginLookup.PluginType.INPUT, new InputPluginCreator(this)); this.pluginCreatorsRegistry.put(PluginLookup.PluginType.CODEC, new CodecPluginCreator()); this.pluginCreatorsRegistry.put(PluginLookup.PluginType.FILTER, new FilterPluginCreator()); @@ -109,71 +128,102 @@ PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metric @SuppressWarnings("unchecked") @Override - public IRubyObject buildInput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public IRubyObject buildInput(final RubyString name, + final IRubyObject args, + final SourceWithMetadata source) { return plugin( - RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT, name.asJavaString(), - source, (Map) args, pluginArgs + RubyUtil.RUBY.getCurrentContext(), + PluginLookup.PluginType.INPUT, + name.asJavaString(), + (RubyHash) args, + source ); } @SuppressWarnings("unchecked") @Override - public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public AbstractOutputDelegatorExt buildOutput(final RubyString name, + final IRubyObject args, + final SourceWithMetadata source) { return (AbstractOutputDelegatorExt) plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT, name.asJavaString(), - source, (Map) args, pluginArgs + (RubyHash) args, source ); } @SuppressWarnings("unchecked") @Override - public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public AbstractFilterDelegatorExt buildFilter(final RubyString name, + final IRubyObject args, + final SourceWithMetadata source) { return (AbstractFilterDelegatorExt) plugin( RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER, name.asJavaString(), - source, (Map) args, pluginArgs + (RubyHash) args, source ); } @SuppressWarnings("unchecked") @Override - public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args, - Map pluginArgs) { + public IRubyObject buildCodec(final RubyString name, + final IRubyObject args, + final SourceWithMetadata source) { return plugin( - RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, - name.asJavaString(), source, (Map) args, pluginArgs + RubyUtil.RUBY.getCurrentContext(), + PluginLookup.PluginType.CODEC, + name.asJavaString(), + (RubyHash) args, + source ); } @Override public Codec buildDefaultCodec(String codecName) { return (Codec) JavaUtil.unwrapJavaValue(plugin( - RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, - codecName, null, Collections.emptyMap(), Collections.emptyMap() + RubyUtil.RUBY.getCurrentContext(), + PluginLookup.PluginType.CODEC, + codecName, + RubyHash.newHash(RubyUtil.RUBY), + null )); } @SuppressWarnings("unchecked") @JRubyMethod(required = 3, optional = 1) public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) { + final SourceWithMetadata source = args.length > 3 ? (SourceWithMetadata) JavaUtil.unwrapIfJavaObject(args[3]) : null; + return plugin( context, PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)), args[1].asJavaString(), - JavaUtil.unwrapIfJavaObject(args[2]), - args.length > 3 ? (Map) args[3] : new HashMap<>(), - null + (RubyHash) args[2], + source ); } @SuppressWarnings("unchecked") - private IRubyObject plugin(final ThreadContext context, final PluginLookup.PluginType type, final String name, - SourceWithMetadata source, final Map args, - Map pluginArgs) { - final String id = generateOrRetrievePluginId(context, type, name, source); - pluginsById.add(id); + private IRubyObject plugin(final ThreadContext context, + final PluginLookup.PluginType type, + final String name, + final RubyHash args, + final SourceWithMetadata source) { + final String id = generateOrRetrievePluginId(type, source, args); + + if (id == null) { + throw context.runtime.newRaiseException( + RubyUtil.CONFIGURATION_ERROR_CLASS, + String.format( + "Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name + ) + ); + } + if (!pluginsById.add(id)) { + throw context.runtime.newRaiseException( + RubyUtil.CONFIGURATION_ERROR_CLASS, + String.format("Two plugins have the id '%s', please fix this conflict", id) + ); + } + final AbstractNamespacedMetricExt typeScopedMetric = metrics.create(context, type.rubyLabel()); final PluginLookup.PluginClass pluginClass = pluginResolver.resolve(type, name); @@ -199,19 +249,19 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi } else if (type == PluginLookup.PluginType.FILTER) { return filterDelegator( context, null, - filterClass, klass, rubyArgs, typeScopedMetric, executionCntx); + filterDelegatorClass, klass, rubyArgs, typeScopedMetric, executionCntx); } else { - final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs); + final IRubyObject pluginInstance = ContextualizerExt.initializePlugin(context, executionCntx, klass, rubyArgs); + final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id)); scopedMetric.gauge(context, MetricKeys.NAME_KEY, pluginInstance.callMethod(context, "config_name")); pluginInstance.callMethod(context, "metric=", scopedMetric); - pluginInstance.callMethod(context, "execution_context=", executionCntx); return pluginInstance; } } else { - if (pluginArgs == null) { - String err = String.format("Cannot start the Java plugin '%s' in the Ruby execution engine." + - " The Java execution engine is required to run Java plugins.", name); + if (engine != Engine.JAVA) { + String err = String.format("Cannot start the Java plugin '%s' in the %s execution engine." + + " The Java execution engine is required to run Java plugins.", name, engine); throw new IllegalStateException(err); } @@ -221,38 +271,97 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.Plugi } Context contextWithMetrics = executionContextFactory.toContext(type, metrics.getRoot(context)); - return pluginCreator.createDelegator(name, pluginArgs, id, typeScopedMetric, pluginClass, contextWithMetrics); + return pluginCreator.createDelegator(name, convertToJavaCoercible(args), id, typeScopedMetric, pluginClass, contextWithMetrics); } } - private String generateOrRetrievePluginId(ThreadContext context, PluginLookup.PluginType type, String name, - SourceWithMetadata source) { - final String id; - if (type == PluginLookup.PluginType.CODEC) { - id = UUID.randomUUID().toString(); + private Map convertToJavaCoercible(Map input) { + final Map output = new HashMap<>(input); + + // Intercept Codecs + for (final Map.Entry entry : input.entrySet()) { + final String key = entry.getKey(); + final Object value = entry.getValue(); + if (value instanceof IRubyObject) { + final Object unwrapped = JavaUtil.unwrapJavaValue((IRubyObject) value); + if (unwrapped instanceof Codec) { + output.put(key, unwrapped); + } + } + } + + return output; + } + + // TODO: caller seems to think that the args is `Map`, but + // at least any `id` present is actually a `String`. + private String generateOrRetrievePluginId(final PluginLookup.PluginType type, + final SourceWithMetadata source, + final Map args) { + final Optional unprocessedId; + if (source == null) { + unprocessedId = extractId(() -> extractIdFromArgs(args), + this::generateUUID); } else { - String unresolvedId = lir.getGraph().vertices() - .filter(v -> v.getSourceWithMetadata() != null - && v.getSourceWithMetadata().equalsWithoutText(source)) - .findFirst() - .map(Vertex::getId).orElse(null); - id = (String) configVariables.expand(unresolvedId); + unprocessedId = extractId(() -> extractIdFromLIR(source), + () -> extractIdFromArgs(args), + () -> generateUUIDForCodecs(type)); } - if (id == null) { - throw context.runtime.newRaiseException( - RubyUtil.CONFIGURATION_ERROR_CLASS, - String.format( - "Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name - ) - ); + + return unprocessedId + .map(configVariables::expand) + .filter(String.class::isInstance) + .map(String.class::cast) + .orElse(null); + } + + private Optional extractId(final IdExtractor... extractors) { + for (IdExtractor extractor : extractors) { + final Optional extracted = extractor.extract(); + if (extracted.isPresent()) { + return extracted; + } } - if (pluginsById.contains(id)) { - throw context.runtime.newRaiseException( - RubyUtil.CONFIGURATION_ERROR_CLASS, - String.format("Two plugins have the id '%s', please fix this conflict", id) - ); + return Optional.empty(); + } + + @FunctionalInterface + interface IdExtractor { + Optional extract(); + } + + private Optional extractIdFromArgs(final Map args) { + if (!args.containsKey("id")) { + return Optional.empty(); + } + + final Object explicitId = args.get("id"); + if (explicitId instanceof String) { + return Optional.of((String) explicitId); + } else if (explicitId instanceof RubyString) { + return Optional.of(((RubyString) explicitId).asJavaString()); + } else { + return Optional.empty(); } - return id; + } + + private Optional generateUUID() { + return Optional.of(UUID.randomUUID().toString()); + } + + private Optional generateUUIDForCodecs(final PluginLookup.PluginType pluginType) { + if (pluginType == PluginLookup.PluginType.CODEC) { + return generateUUID(); + } + return Optional.empty(); + } + + private Optional extractIdFromLIR(final SourceWithMetadata source) { + return lir.getGraph().vertices() + .filter(v -> v.getSourceWithMetadata() != null + && v.getSourceWithMetadata().equalsWithoutText(source)) + .findFirst() + .map(Vertex::getId); } ExecutionContextFactoryExt getExecutionContextFactory() { diff --git a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java index 0ac7195273f..bf93b73d18a 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java @@ -54,12 +54,8 @@ import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; import org.logstash.config.ir.compiler.ComputeStepSyntaxElement; import org.logstash.config.ir.compiler.FilterDelegatorExt; -import org.logstash.config.ir.compiler.PluginFactory; +import org.logstash.config.ir.compiler.RubyIntegration; import org.logstash.ext.JrubyEventExtLibrary; -import co.elastic.logstash.api.Configuration; -import co.elastic.logstash.api.Filter; -import co.elastic.logstash.api.Input; -import co.elastic.logstash.api.Context; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -494,9 +490,9 @@ private Supplier>> mockOutpu } /** - * Configurable Mock {@link PluginFactory} + * Configurable Mock {@link RubyIntegration.PluginFactory} */ - static final class MockPluginFactory implements PluginFactory { + static final class MockPluginFactory implements RubyIntegration.PluginFactory { private final Map> inputs; @@ -514,20 +510,20 @@ static final class MockPluginFactory implements PluginFactory { } @Override - public IRubyObject buildInput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public IRubyObject buildInput(final RubyString name, final IRubyObject args, + SourceWithMetadata source) { return setupPlugin(name, inputs); } @Override - public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public AbstractOutputDelegatorExt buildOutput(final RubyString name, final IRubyObject args, + SourceWithMetadata source) { return PipelineTestUtil.buildOutput(setupPlugin(name, outputs)); } @Override - public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { + public AbstractFilterDelegatorExt buildFilter(final RubyString name, final IRubyObject args, + SourceWithMetadata source) { final RubyObject configNameDouble = org.logstash.config.ir.PluginConfigNameMethodDouble.create(name); return new FilterDelegatorExt( RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS) @@ -535,8 +531,7 @@ public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithM } @Override - public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args, - Map pluginArgs) { + public IRubyObject buildCodec(final RubyString name, final IRubyObject args, SourceWithMetadata source) { throw new IllegalStateException("No codec setup expected in this test."); } @@ -555,17 +550,6 @@ private static T setupPlugin(final RubyString name, } return suppliers.get(name.asJavaString()).get(); } - - @Override - public Input buildInput(final String name, final String id, final Configuration configuration, final Context context) { - return null; - } - - @Override - public Filter buildFilter(final String name, final String id, - final Configuration configuration, final Context context) { - return null; - } } @Test @@ -698,9 +682,9 @@ private String createBigFilterSection(int numFilters) { } /** - * Fixed Mock {@link PluginFactory} + * Fixed Mock {@link RubyIntegration.PluginFactory} * */ - static final class FixedPluginFactory implements PluginFactory { + static final class FixedPluginFactory implements RubyIntegration.PluginFactory { private Supplier input; private Supplier filter; @@ -714,27 +698,17 @@ static final class FixedPluginFactory implements PluginFactory { } @Override - public Input buildInput(String name, String id, Configuration configuration, Context context) { - return null; - } - - @Override - public Filter buildFilter(String name, String id, Configuration configuration, Context context) { - return null; - } - - @Override - public IRubyObject buildInput(RubyString name, SourceWithMetadata source, IRubyObject args, Map pluginArgs) { + public IRubyObject buildInput(RubyString name, IRubyObject args, SourceWithMetadata source) { return this.input.get(); } @Override - public AbstractOutputDelegatorExt buildOutput(RubyString name, SourceWithMetadata source, IRubyObject args, Map pluginArgs) { + public AbstractOutputDelegatorExt buildOutput(RubyString name, IRubyObject args, SourceWithMetadata source) { return PipelineTestUtil.buildOutput(this.output.get()); } @Override - public AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadata source, IRubyObject args, Map pluginArgs) { + public AbstractFilterDelegatorExt buildFilter(RubyString name, IRubyObject args, SourceWithMetadata source) { final RubyObject configNameDouble = org.logstash.config.ir.PluginConfigNameMethodDouble.create(name); return new FilterDelegatorExt( RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS) @@ -742,7 +716,7 @@ public AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadat } @Override - public IRubyObject buildCodec(RubyString name, SourceWithMetadata source, IRubyObject args, Map pluginArgs) { + public IRubyObject buildCodec(RubyString name, IRubyObject args, SourceWithMetadata source) { return null; } diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/PluginFactoryTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/PluginFactoryTest.java deleted file mode 100644 index ebc3990690d..00000000000 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/PluginFactoryTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.logstash.config.ir.compiler; - -import org.junit.Test; - -/** - * Tests for {@link PluginFactory.Default}. - */ -public final class PluginFactoryTest { - - @Test - public void testBuildJavaFilter() throws Exception { - - } - -} diff --git a/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java b/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java index ca2c2f02c8c..2476b4c0b8e 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java +++ b/logstash-core/src/test/java/org/logstash/plugins/TestPluginFactory.java @@ -30,31 +30,29 @@ import org.logstash.plugins.codecs.Line; import java.util.Collections; -import java.util.Map; public class TestPluginFactory implements RubyIntegration.PluginFactory { @Override - public IRubyObject buildInput(RubyString name, SourceWithMetadata source, - IRubyObject args, Map pluginArgs) { + public IRubyObject buildInput(RubyString name, IRubyObject args, + SourceWithMetadata source) { return null; } @Override - public AbstractOutputDelegatorExt buildOutput(RubyString name, SourceWithMetadata source, - IRubyObject args, Map pluginArgs) { + public AbstractOutputDelegatorExt buildOutput(RubyString name, IRubyObject args, + SourceWithMetadata source) { return null; } @Override - public AbstractFilterDelegatorExt buildFilter(RubyString name, SourceWithMetadata source, - IRubyObject args, Map pluginArgs) { + public AbstractFilterDelegatorExt buildFilter(RubyString name, IRubyObject args, + SourceWithMetadata source) { return null; } @Override - public IRubyObject buildCodec(RubyString name, SourceWithMetadata source, IRubyObject args, - Map pluginArgs) { + public IRubyObject buildCodec(RubyString name, IRubyObject args, SourceWithMetadata source) { return null; } diff --git a/logstash-core/src/test/java/org/logstash/plugins/factory/PluginFactoryExtTest.java b/logstash-core/src/test/java/org/logstash/plugins/factory/PluginFactoryExtTest.java index ded3049a657..1f621b90b85 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/factory/PluginFactoryExtTest.java +++ b/logstash-core/src/test/java/org/logstash/plugins/factory/PluginFactoryExtTest.java @@ -30,6 +30,7 @@ import org.logstash.config.ir.InvalidIRException; import org.logstash.config.ir.PipelineIR; import org.logstash.config.ir.RubyEnvTestCase; +import org.logstash.execution.Engine; import org.logstash.instrument.metrics.NamespacedMetricExt; import org.logstash.plugins.MetricTestCase; import org.logstash.plugins.PluginLookup; @@ -93,13 +94,12 @@ public void testPluginIdResolvedWithEnvironmentVariables() throws InvalidIRExcep envVars.put("CUSTOM", "test"); PluginFactoryExt sut = new PluginFactoryExt(RubyUtil.RUBY, RubyUtil.PLUGIN_FACTORY_CLASS, mockPluginResolver); - sut.init(pipelineIR, metricsFactory, execContextFactory, RubyUtil.FILTER_DELEGATOR_CLASS, envVars::get); + sut.init(pipelineIR, metricsFactory, execContextFactory, RubyUtil.FILTER_DELEGATOR_CLASS, envVars::get, Engine.JAVA); RubyString pluginName = RubyUtil.RUBY.newString("mockinput"); // Exercise - IRubyObject pluginInstance = sut.buildInput(pluginName, sourceWithMetadata, RubyHash.newHash(RubyUtil.RUBY), - Collections.emptyMap()); + IRubyObject pluginInstance = sut.buildInput(pluginName, RubyHash.newHash(RubyUtil.RUBY), sourceWithMetadata); //Verify IRubyObject id = pluginInstance.callMethod(RUBY.getCurrentContext(), "id");