diff --git a/logstash-core/lib/logstash/plugin.rb b/logstash-core/lib/logstash/plugin.rb index 9c4d445dc59..dc22c3f4bea 100644 --- a/logstash-core/lib/logstash/plugin.rb +++ b/logstash-core/lib/logstash/plugin.rb @@ -19,7 +19,6 @@ require "logstash/plugins/ecs_compatibility_support" require "concurrent" require "logstash/plugins/event_factory_support" -require "securerandom" require_relative 'plugin_metadata' @@ -65,18 +64,11 @@ def eql?(other) end def initialize(params={}) - @logger = self.logger - @deprecation_logger = self.deprecation_logger - # need to access settings statically because plugins are initialized in config_ast with no context. - settings = LogStash::SETTINGS - @slow_logger = self.slow_logger(settings.get("slowlog.threshold.warn").to_nanos, - settings.get("slowlog.threshold.info").to_nanos, - settings.get("slowlog.threshold.debug").to_nanos, - settings.get("slowlog.threshold.trace").to_nanos) @params = LogStash::Util.deep_clone(params) # The id should always be defined normally, but in tests that might not be the case # In the future we may make this more strict in the Plugin API - @params["id"] ||= "#{self.class.config_name}_#{SecureRandom.uuid}" + @params["id"] ||= LogStash::Plugins::PluginFactory.generate_plugin_id + __initialize_logging # after id is generated end # Return a uniq ID for this plugin configuration, by default @@ -191,4 +183,32 @@ def execution_context=(new_context) def execution_context @execution_context || LogStash::ExecutionContext::Empty end + + # override Loggable (self.class.logger) delegating methods : + + attr_reader :logger + attr_reader :deprecation_logger + attr_reader :slow_logger + + private + + def __initialize_logging + @logger = LogStash::Logging::PluginLogger.new(self) + @deprecation_logger = self.class.deprecation_logger + # need to access settings statically because plugins are initialized in config_ast with no context. + settings = LogStash::SETTINGS + @slow_logger = self.class.slow_logger(settings.get("slowlog.threshold.warn").to_nanos, + settings.get("slowlog.threshold.info").to_nanos, + settings.get("slowlog.threshold.debug").to_nanos, + settings.get("slowlog.threshold.trace").to_nanos) + end + + # TODO do we want to keep this around due plugin specs mocking logger through the class.logger call?! + # + # @override LogStash::Util::Loggable.logger + # @private + # def self.logger(plugin = nil) + # plugin.nil? ? super() : LogStash::Logging::PluginLogger.new(plugin) + # end + end # class LogStash::Plugin diff --git a/logstash-core/spec/logstash/config/mixin_spec.rb b/logstash-core/spec/logstash/config/mixin_spec.rb index 7928409bb5e..2ab91ed744b 100644 --- a/logstash-core/spec/logstash/config/mixin_spec.rb +++ b/logstash-core/spec/logstash/config/mixin_spec.rb @@ -37,7 +37,7 @@ end it "should not log the password" do - expect(LogStash::Logging::Logger).to receive(:new).with(anything).and_return(double_logger) + expect(LogStash::Logging::PluginLogger).to receive(:new).with(anything).and_return(double_logger) expect(double_logger).to receive(:warn) do |arg1,arg2| message = 'You are using a deprecated config setting "old_opt" set in test_deprecated. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. this is old school If you have any questions about this, please visit the #logstash channel on freenode irc.' expect(arg1).to eq(message) diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index 5b406ad05a1..370de7496f7 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -507,10 +507,16 @@ def flush(options) pipeline.close end - it "should use LIR provided IDs" do - expect(pipeline.inputs.first.id).to eq(pipeline.lir.input_plugin_vertices.first.id) - expect(pipeline.filters.first.id).to eq(pipeline.lir.filter_plugin_vertices.first.id) - expect(pipeline.outputs.first.id).to eq(pipeline.lir.output_plugin_vertices.first.id) + it "should generate plugin ids" do + input_id = pipeline.inputs.first.id + filter_id = pipeline.filters.first.id + output_id = pipeline.outputs.first.id + [ input_id, filter_id, output_id ].each do |plugin_id| + expect(plugin_id).to match /([0-9]|[a-f])+/ + end + expect( input_id ).to_not eq filter_id + expect( input_id ).to_not eq output_id + expect( output_id ).to_not eq filter_id end end diff --git a/logstash-core/spec/logstash/plugin_spec.rb b/logstash-core/spec/logstash/plugin_spec.rb index 1656a3855b6..70936e24a14 100644 --- a/logstash-core/spec/logstash/plugin_spec.rb +++ b/logstash-core/spec/logstash/plugin_spec.rb @@ -411,8 +411,8 @@ def register; end context "when the id is not provided provided" do subject { plugin.new(config) } - it "return a human readable ID" do - expect(subject.id).to match(/^simple_plugin_/) + it "generates an id" do + expect(subject.id).to match /([0-9]|[a-f])+/ end end end diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index abf86480298..ba9c3b14dc2 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -72,6 +72,7 @@ import org.logstash.log.DeprecationLoggerExt; import org.logstash.log.LoggableExt; import org.logstash.log.LoggerExt; +import org.logstash.log.PluginLoggerExt; import org.logstash.log.SlowLoggerExt; import org.logstash.plugins.HooksRegistryExt; import org.logstash.plugins.UniversalPluginExt; @@ -477,6 +478,8 @@ public final class RubyUtil { final RubyModule loggingModule = LOGSTASH_MODULE.defineOrGetModuleUnder("Logging"); LOGGER = loggingModule.defineClassUnder("Logger", RUBY.getObject(), LoggerExt::new); LOGGER.defineAnnotatedMethods(LoggerExt.class); + final RubyClass pluginLoggerClass = loggingModule.defineClassUnder("PluginLogger", LOGGER, PluginLoggerExt::new); + pluginLoggerClass.defineAnnotatedMethods(PluginLoggerExt.class); SLOW_LOGGER = loggingModule.defineClassUnder( "SlowLogger", RUBY.getObject(), SlowLoggerExt::new); SLOW_LOGGER.defineAnnotatedMethods(SlowLoggerExt.class); diff --git a/logstash-core/src/main/java/org/logstash/common/Util.java b/logstash-core/src/main/java/org/logstash/common/Util.java index 0c47a53136e..53f69b7a640 100644 --- a/logstash-core/src/main/java/org/logstash/common/Util.java +++ b/logstash-core/src/main/java/org/logstash/common/Util.java @@ -20,6 +20,8 @@ package org.logstash.common; +import org.apache.commons.codec.digest.DigestUtils; + import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -41,6 +43,14 @@ public static String digest(String base) { return bytesToHexString(hash); } + /** + * Generates a short digest from bytes. + * @return a hex digest string (16 characters long) + */ + public static String shortDigest(final byte[] bytes) { // using MD5 for speed + return new DigestUtils("MD5").digestAsHex(bytes).substring(0, 16); + } + public static String bytesToHexString(byte[] bytes) { StringBuilder hexString = new StringBuilder(); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java index d79c2d8c37c..70dd981324d 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java @@ -46,7 +46,8 @@ public abstract class AbstractFilterDelegatorExt extends RubyObject { protected AbstractNamespacedMetricExt metricEvents; - protected RubyString id; + protected String id; + private transient RubyString idString; protected LongCounter eventMetricOut; @@ -60,7 +61,7 @@ public AbstractFilterDelegatorExt(final Ruby runtime, final RubyClass metaClass) protected void initMetrics(final String id, final AbstractNamespacedMetricExt namespacedMetric) { final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); - this.id = RubyString.newString(context.runtime, id); + this.setId(context, id); synchronized(namespacedMetric.getMetric()) { metricEvents = namespacedMetric.namespace(context, MetricKeys.EVENTS_KEY); eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY); @@ -76,7 +77,7 @@ public IRubyObject register(final ThreadContext context) { return context.nil; } - protected abstract void doRegister(final ThreadContext context); + protected abstract IRubyObject doRegister(final ThreadContext context); @JRubyMethod public IRubyObject close(final ThreadContext context) { @@ -120,11 +121,20 @@ public IRubyObject configName(final ThreadContext context) { protected abstract IRubyObject getConfigName(ThreadContext context); - @JRubyMethod(name = "id") - public IRubyObject getId() { + public String getId() { return id; } + void setId(ThreadContext context, String id) { + this.id = id; + this.idString = (RubyString) RubyString.newString(context.runtime, id).freeze(context); + } + + @JRubyMethod(name = "id") + public IRubyObject id() { + return idString == null ? getRuntime().getNil() : idString; + } + @JRubyMethod(name = "multi_filter") @SuppressWarnings({"unchecked", "rawtypes"}) public RubyArray multiFilter(final IRubyObject input) { diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java index 7dd28e955d8..d2ae6a37eaf 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java @@ -51,7 +51,8 @@ public abstract class AbstractOutputDelegatorExt extends RubyObject { private AbstractNamespacedMetricExt metricEvents; - private RubyString id; + private String id; + private transient RubyString idString; private LongCounter eventMetricOut; @@ -90,11 +91,20 @@ public IRubyObject configName(final ThreadContext context) { return getConfigName(context); } - @JRubyMethod(name = "id") - public IRubyObject getId() { + public String getId() { return id; } + private void setId(ThreadContext context, String id) { + this.id = id; + this.idString = (RubyString) RubyString.newString(context.runtime, id).freeze(context); + } + + @JRubyMethod(name = "id") + public IRubyObject id() { + return idString == null ? getRuntime().getNil() : idString; + } + @JRubyMethod public IRubyObject metric() { return metric; @@ -125,9 +135,9 @@ public IRubyObject multiReceive(final IRubyObject events) { } protected void initMetrics(final String id, final AbstractMetricExt metric) { - this.metric = metric; final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); - this.id = RubyString.newString(context.runtime, id); + this.setId(context, id); + this.metric = metric; synchronized (metric) { namespacedMetric = metric.namespace(context, context.runtime.newSymbol(id)); metricEvents = namespacedMetric.namespace(context, MetricKeys.EVENTS_KEY); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java index d61e0c13221..7a04af7a470 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java @@ -26,7 +26,6 @@ import org.jruby.RubyClass; import org.jruby.RubyHash; import org.jruby.RubyObject; -import org.jruby.RubyString; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.internal.runtime.methods.DynamicMethod; @@ -35,8 +34,6 @@ import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; import org.logstash.instrument.metrics.counter.LongCounter; -import java.util.UUID; - import static org.logstash.RubyUtil.RUBY; @JRubyClass(name = "FilterDelegator") @@ -56,12 +53,11 @@ public final class FilterDelegatorExt extends AbstractFilterDelegatorExt { @JRubyMethod(name="initialize") public IRubyObject initialize(final ThreadContext context, final IRubyObject filter, final IRubyObject id) { - this.id = (RubyString) id; this.filter = filter; filterClass = filter.getSingletonClass().getRealClass(); filterMethod = filterClass.searchMethod(FILTER_METHOD_NAME); final AbstractNamespacedMetricExt namespacedMetric = (AbstractNamespacedMetricExt) filter.callMethod(context, "metric"); - initMetrics(this.id.asJavaString(), namespacedMetric); + initMetrics(id.asJavaString(), namespacedMetric); flushes = filter.respondsTo("flush"); return this; } @@ -75,7 +71,7 @@ public FilterDelegatorExt initForTesting(final IRubyObject filter, RubyObject co filterMethod = filter.getMetaClass().searchMethod(FILTER_METHOD_NAME); flushes = filter.respondsTo("flush"); filterClass = configNameDouble.getType(); - id = RUBY.newString(UUID.randomUUID().toString()); + this.setId(filter.getRuntime().getCurrentContext(), RubyIntegration.generatePluginId()); return this; } @@ -83,9 +79,8 @@ public FilterDelegatorExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); } - @Override - protected void doRegister(final ThreadContext context) { - filter.callMethod(context, "register"); + protected IRubyObject registerImpl(final ThreadContext context) { + return filter.callMethod(context, "register"); } @Override @@ -118,12 +113,22 @@ protected IRubyObject getConfigName(final ThreadContext context) { return filterClass.callMethod(context, "config_name"); } + @Override + public IRubyObject doRegister(final ThreadContext context) { + try { + org.apache.logging.log4j.ThreadContext.put("plugin.id", getId()); + return registerImpl(context); + } finally { + org.apache.logging.log4j.ThreadContext.remove("plugin.id"); + } + } + @Override @SuppressWarnings({"rawtypes"}) protected RubyArray doMultiFilter(final RubyArray batch) { - final IRubyObject pluginId = this.getId(); - org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString()); try { + org.apache.logging.log4j.ThreadContext.put("plugin.id", getId()); + return (RubyArray) filterMethod.call( RUBY.getCurrentContext(), filter, filterClass, FILTER_METHOD_NAME, batch); } finally { @@ -133,7 +138,22 @@ protected RubyArray doMultiFilter(final RubyArray batch) { @Override protected IRubyObject doFlush(final ThreadContext context, final RubyHash options) { - return filter.callMethod(context, "flush", options); + try { + org.apache.logging.log4j.ThreadContext.put("plugin.id", getId()); + return filter.callMethod(context, "flush", options); + } finally { + org.apache.logging.log4j.ThreadContext.remove("plugin.id"); + } + } + + @Override + public IRubyObject doClose(final ThreadContext context) { + try { + org.apache.logging.log4j.ThreadContext.put("plugin.id", getId()); + return doCloseImpl(context); + } finally { + org.apache.logging.log4j.ThreadContext.remove("plugin.id"); + } } @Override diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java index 043af3f9fb4..9a21ecafdca 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java @@ -89,7 +89,8 @@ protected RubyArray doMultiFilter(final RubyArray batch) { } @Override - protected void doRegister(ThreadContext context) { + protected IRubyObject doRegister(ThreadContext context) { + return context.nil; } @Override diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java index f99c0a5bc78..f95a202da81 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaInputDelegatorExt.java @@ -24,6 +24,7 @@ import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.RubyObject; +import org.jruby.RubyString; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.ThreadContext; @@ -49,6 +50,8 @@ public class JavaInputDelegatorExt extends RubyObject { private Input input; + private transient RubyString idString; + private DecoratingQueueWriter decoratingQueueWriter; public JavaInputDelegatorExt(Ruby runtime, RubyClass metaClass) { @@ -58,11 +61,12 @@ public JavaInputDelegatorExt(Ruby runtime, RubyClass metaClass) { public static JavaInputDelegatorExt create(final JavaBasePipelineExt pipeline, final AbstractNamespacedMetricExt metric, final Input input, final Map pluginArgs) { + final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); final JavaInputDelegatorExt instance = new JavaInputDelegatorExt(RubyUtil.RUBY, RubyUtil.JAVA_INPUT_DELEGATOR_CLASS); - AbstractNamespacedMetricExt scopedMetric = metric.namespace(RubyUtil.RUBY.getCurrentContext(), RubyUtil.RUBY.newSymbol(input.getId())); - scopedMetric.gauge(RubyUtil.RUBY.getCurrentContext(), MetricKeys.NAME_KEY, RubyUtil.RUBY.newString(input.getName())); - instance.setMetric(RubyUtil.RUBY.getCurrentContext(), scopedMetric); + AbstractNamespacedMetricExt scopedMetric = metric.namespace(context, RubyUtil.RUBY.newSymbol(input.getId())); + scopedMetric.gauge(context, MetricKeys.NAME_KEY, RubyUtil.RUBY.newString(input.getName())); + instance.setMetric(context, scopedMetric); instance.input = input; instance.pipeline = pipeline; instance.initializeQueueWriter(pluginArgs); @@ -79,12 +83,13 @@ public IRubyObject start(final ThreadContext context) { } else { queueWriter = qw; } + final String pipelineId = pipeline.pipelineId().asJavaString(); Thread t = new Thread(() -> { - org.apache.logging.log4j.ThreadContext.put("pipeline.id", pipeline.pipelineId().toString()); - org.apache.logging.log4j.ThreadContext.put("plugin.id", this.getId(context).toString()); + org.apache.logging.log4j.ThreadContext.put("pipeline.id", pipelineId); + org.apache.logging.log4j.ThreadContext.put("plugin.id", this.getId()); input.start(queueWriter::push); }); - t.setName(pipeline.pipelineId().asJavaString() + "_" + input.getName() + "_" + input.getId()); + t.setName(pipelineId + "_" + input.getName() + "_" + input.getId()); t.start(); return RubyUtil.toRubyObject(t); } @@ -105,9 +110,17 @@ public IRubyObject configName(final ThreadContext context) { return context.getRuntime().newString(input.getName()); } + public String getId() { + return input.getId(); + } + @JRubyMethod(name = "id") - public IRubyObject getId(final ThreadContext context) { - return context.getRuntime().newString(input.getId()); + public IRubyObject id(final ThreadContext context) { + IRubyObject idString = this.idString; + if (idString == null) { + idString = this.idString = (RubyString) context.runtime.newString(input.getId()).freeze(context); + } + return idString; } @JRubyMethod(name = "threadable") diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java index 6979d6d242e..fb025e74bc3 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java @@ -97,8 +97,7 @@ protected IRubyObject getConcurrency(final ThreadContext context) { @Override protected void doOutput(final Collection batch) { try { - final IRubyObject pluginId = this.getId(); - org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString()); + org.apache.logging.log4j.ThreadContext.put("plugin.id", getId()); strategy.multiReceive(RUBY.getCurrentContext(), (IRubyObject) batch); } catch (final InterruptedException ex) { throw new IllegalStateException(ex); @@ -109,12 +108,22 @@ protected void doOutput(final Collection batch) @Override protected void close(final ThreadContext context) { - strategy.doClose(context); + try { + org.apache.logging.log4j.ThreadContext.put("plugin.id", getId()); + strategy.doClose(context); + } finally { + org.apache.logging.log4j.ThreadContext.remove("plugin.id"); + } } @Override protected void doRegister(final ThreadContext context) { - strategy.register(context); + try { + org.apache.logging.log4j.ThreadContext.put("plugin.id", getId()); + strategy.register(context); + } finally { + org.apache.logging.log4j.ThreadContext.remove("plugin.id"); + } } @Override diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java index bc66371c67a..2969f421a79 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java @@ -21,11 +21,13 @@ package org.logstash.config.ir.compiler; import co.elastic.logstash.api.Codec; +import org.apache.commons.codec.digest.DigestUtils; import org.jruby.RubyString; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.common.SourceWithMetadata; +import org.logstash.common.Util; -import java.util.Map; +import java.util.Random; /** * This class holds interfaces implemented by Ruby concrete classes. @@ -52,4 +54,17 @@ public interface PluginFactory { Codec buildDefaultCodec(String codecName); } + + /** + * Generates a plugin id. + * @return a (random) generated plugin identifier + */ + public static String generatePluginId() { + // similar to UUID.randomUUID() but fast - we do not need "secure" random ids + byte[] randomBytes = new byte[16]; + new Random().nextBytes(randomBytes); // seeded from System.nanoTime() + // for improved log readability we limit the HEX string to a shorter length + return Util.shortDigest(randomBytes); + } + } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/graph/Vertex.java b/logstash-core/src/main/java/org/logstash/config/ir/graph/Vertex.java index e5b73b08008..140929ff1e7 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/graph/Vertex.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/graph/Vertex.java @@ -20,6 +20,7 @@ package org.logstash.config.ir.graph; +import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicInteger; import org.logstash.common.SourceWithMetadata; @@ -224,7 +225,8 @@ public String getId() { // they have no source metadata. This might also be used in the future by alternate config languages which are // willing to take the hit. if (this.getSourceWithMetadata() != null) { - generatedId = Util.digest(this.graph.uniqueHash() + "|" + this.getSourceWithMetadata().uniqueHash()); + final String uniqueString = this.graph.uniqueHash() + "|" + this.getSourceWithMetadata().uniqueHash(); + generatedId = Util.shortDigest(uniqueString.getBytes(StandardCharsets.UTF_8)); } else { generatedId = this.uniqueHash(); } diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index d08385be801..7b77c8368ee 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -46,6 +46,7 @@ import org.jruby.runtime.ThreadContext; import org.jruby.runtime.Visibility; import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.util.ByteList; import org.logstash.RubyUtil; import org.logstash.ackedqueue.QueueFactoryExt; import org.logstash.ackedqueue.ext.JRubyAckedQueueExt; @@ -148,19 +149,14 @@ public AbstractPipelineExt(final Ruby runtime, final RubyClass metaClass) { @JRubyMethod public final AbstractPipelineExt initialize(final ThreadContext context, final IRubyObject pipelineConfig, final IRubyObject namespacedMetric, - final IRubyObject rubyLogger) - throws NoSuchAlgorithmException { + final IRubyObject rubyLogger) { reporter = new PipelineReporterExt( context.runtime, RubyUtil.PIPELINE_REPORTER_CLASS).initialize(context, rubyLogger, this ); pipelineSettings = pipelineConfig; configString = (RubyString) pipelineSettings.callMethod(context, "config_string"); configParts = pipelineSettings.toJava(PipelineConfig.class).getConfigParts(); - configHash = context.runtime.newString( - Hex.encodeHexString( - MessageDigest.getInstance("SHA1").digest(configString.getBytes()) - ) - ); + configHash = context.runtime.newString(Hex.encodeHexString(digestString(configString))); settings = pipelineSettings.callMethod(context, "settings"); final IRubyObject id = getSetting(context, "pipeline.id"); if (id.isNil()) { @@ -191,6 +187,19 @@ public final AbstractPipelineExt initialize(final ThreadContext context, return this; } + private static byte[] digestString(final RubyString content) { + final MessageDigest digest; + try { + digest = MessageDigest.getInstance("SHA1"); + } catch (NoSuchAlgorithmException ex) { + throw new AssertionError(ex); + } + + final ByteList bytes = content.getByteList(); + digest.update(bytes.getUnsafeBytes(), bytes.getBegin(), bytes.getRealSize()); + return digest.digest(); + } + /** * queue opening needs to happen out of the the initialize method because the * AbstractPipeline is used for pipeline config validation and the queue diff --git a/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java index f860aebbf63..3e401858730 100644 --- a/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java @@ -67,8 +67,7 @@ public JavaBasePipelineExt(final Ruby runtime, final RubyClass metaClass) { } @JRubyMethod(required = 4) - public JavaBasePipelineExt initialize(final ThreadContext context, final IRubyObject[] args) - throws IncompleteSourceWithMetadataException, NoSuchAlgorithmException { + public JavaBasePipelineExt initialize(final ThreadContext context, final IRubyObject[] args) { initialize(context, args[0], args[1], args[2]); lirExecution = new CompiledPipeline( lir, diff --git a/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java b/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java index 1d59f8cdc70..406bac50b52 100644 --- a/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java @@ -193,7 +193,7 @@ private RubyArray outputInfo(final ThreadContext context) { final AbstractOutputDelegatorExt delegator = (AbstractOutputDelegatorExt) output; final RubyHash hash = RubyHash.newHash(context.runtime); hash.op_aset(context, TYPE_KEY, delegator.configName(context)); - hash.op_aset(context, ID_KEY, delegator.getId()); + hash.op_aset(context, ID_KEY, delegator.id()); hash.op_aset(context, CONCURRENCY_KEY, delegator.concurrency(context)); result.add(hash); }); diff --git a/logstash-core/src/main/java/org/logstash/log/DeprecationLoggerExt.java b/logstash-core/src/main/java/org/logstash/log/DeprecationLoggerExt.java index 8dd2e1e2f5d..19670148a06 100644 --- a/logstash-core/src/main/java/org/logstash/log/DeprecationLoggerExt.java +++ b/logstash-core/src/main/java/org/logstash/log/DeprecationLoggerExt.java @@ -61,9 +61,9 @@ private void initialize(final String loggerName) { @JRubyMethod(name = "deprecated", required = 1, optional = 1) public IRubyObject rubyDeprecated(final ThreadContext context, final IRubyObject[] args) { if (args.length > 1) { - logger.deprecated(args[0].asJavaString(), args[1]); + logger.deprecated(args[0].toString(), args[1]); } else { - logger.deprecated(args[0].asJavaString()); + logger.deprecated(args[0].toString()); } return this; } diff --git a/logstash-core/src/main/java/org/logstash/log/LoggableExt.java b/logstash-core/src/main/java/org/logstash/log/LoggableExt.java index caef20eaf05..53cd3b97fd3 100644 --- a/logstash-core/src/main/java/org/logstash/log/LoggableExt.java +++ b/logstash-core/src/main/java/org/logstash/log/LoggableExt.java @@ -35,8 +35,8 @@ /** * JRuby extension, it's part of log4j wrapping for JRuby. - * */ -@JRubyModule(name = "Loggable") + */ +@JRubyModule(name = "LogStash::Util::Loggable") public final class LoggableExt { private LoggableExt() { @@ -53,21 +53,21 @@ public static RubyModule included(final ThreadContext context, final IRubyObject @JRubyMethod public static IRubyObject logger(final ThreadContext context, final IRubyObject self) { - return self.getSingletonClass().callMethod(context, "logger"); + return self.getMetaClass().callMethod(context, "logger"); } @JRubyMethod(name = "slow_logger", required = 4) public static IRubyObject slowLogger(final ThreadContext context, final IRubyObject self, final IRubyObject[] args) { - return self.getSingletonClass().callMethod(context, "slow_logger", args); + return self.getMetaClass().callMethod(context, "slow_logger", args); } @JRubyMethod(name= "deprecation_logger") public static IRubyObject deprecationLogger(final ThreadContext context, final IRubyObject self) { - return self.getSingletonClass().callMethod(context, "deprecation_logger"); + return self.getMetaClass().callMethod(context, "deprecation_logger"); } - private static String log4jName(final RubyModule self) { + static String log4jName(final RubyModule self) { String name; if (self.getBaseName() == null) { // anonymous module/class RubyModule real = self; diff --git a/logstash-core/src/main/java/org/logstash/log/LoggerExt.java b/logstash-core/src/main/java/org/logstash/log/LoggerExt.java index dae0f97de6c..a0d57a78eb6 100644 --- a/logstash-core/src/main/java/org/logstash/log/LoggerExt.java +++ b/logstash-core/src/main/java/org/logstash/log/LoggerExt.java @@ -40,16 +40,16 @@ import java.net.URI; /** - * JRuby extension, it's part of log4j wrapping for JRuby. - * Wrapper log4j Logger as Ruby like class - * */ -@JRubyClass(name = "Logger") + * JRuby extension, that wraps a (native) log4j2 logger. + * Provides a Ruby logger interface for Logstash. + */ +@JRubyClass(name = "LogStash::Logging::Logger") public class LoggerExt extends RubyObject { private static final long serialVersionUID = 1L; private static final Object CONFIG_LOCK = new Object(); - private Logger logger; + Logger logger; public LoggerExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); @@ -57,97 +57,209 @@ public LoggerExt(final Ruby runtime, final RubyClass metaClass) { @JRubyMethod public LoggerExt initialize(final ThreadContext context, final IRubyObject loggerName) { - logger = LogManager.getLogger(loggerName.asJavaString()); + initializeLogger(loggerName.asJavaString()); return this; } + void initializeLogger(final String name) { + this.logger = LogManager.getLogger(name); + } + + /** + * {@code logger.trace?} + * @param context JRuby context + * @return true/false + */ + @JRubyMethod(name = "trace?") + public RubyBoolean isTrace(final ThreadContext context) { + return logger.isTraceEnabled() ? context.tru : context.fals; + } + + /** + * {@code logger.debug?} + * @param context JRuby context + * @return true/false + */ @JRubyMethod(name = "debug?") public RubyBoolean isDebug(final ThreadContext context) { return logger.isDebugEnabled() ? context.tru : context.fals; } + /** + * {@code logger.info?} + * @param context JRuby context + * @return true/false + */ @JRubyMethod(name = "info?") public RubyBoolean isInfo(final ThreadContext context) { return logger.isInfoEnabled() ? context.tru : context.fals; } + /** + * {@code logger.error?} + * @param context JRuby context + * @return true/false + */ @JRubyMethod(name = "error?") public RubyBoolean isError(final ThreadContext context) { return logger.isErrorEnabled() ? context.tru : context.fals; } + /** + * {@code logger.warn?} + * @param context JRuby context + * @return true/false + */ @JRubyMethod(name = "warn?") public RubyBoolean isWarn(final ThreadContext context) { return logger.isWarnEnabled() ? context.tru : context.fals; } + /** + * {@code logger.fatal?} + * @param context JRuby context + * @return true/false + */ @JRubyMethod(name = "fatal?") public RubyBoolean isFatal(final ThreadContext context) { return logger.isDebugEnabled() ? context.tru : context.fals; } - @JRubyMethod(name = "trace?") - public RubyBoolean isTrace(final ThreadContext context) { - return logger.isDebugEnabled() ? context.tru : context.fals; + /** + * {@code logger.trace(msg)} + * @param msg a message to log (will be `to_s` converted) + * @return self + */ + @JRubyMethod + public IRubyObject trace(final IRubyObject msg) { + logger.trace(msg.asString()); + return this; } - @JRubyMethod(name = "debug", required = 1, optional = 1) - public IRubyObject rubyDebug(final ThreadContext context, final IRubyObject[] args) { - if (args.length > 1) { - logger.debug(args[0].asJavaString(), args[1]); - } else { - logger.debug(args[0].asJavaString()); - } + /** + * {@code logger.trace(msg, data)} + * @param msg a message to log (will be `to_s` converted) + * @param data additional contextual data to be logged + * @return self + */ + @JRubyMethod + public IRubyObject trace(final IRubyObject msg, final IRubyObject data) { + if (logger.isTraceEnabled()) logger.trace(msg.toString(), data); return this; } - @JRubyMethod(name = "warn", required = 1, optional = 1) - public IRubyObject rubyWarn(final ThreadContext context, final IRubyObject[] args) { - if (args.length > 1) { - logger.warn(args[0].asJavaString(), args[1]); - } else { - logger.warn(args[0].asJavaString()); - } + /** + * {@code logger.debug(msg)} + * @param msg a message to log (will be `to_s` converted) + * @return self + */ + @JRubyMethod + public IRubyObject debug(final IRubyObject msg) { + logger.debug(msg.asString()); return this; } - @JRubyMethod(name = "info", required = 1, optional = 1) - public IRubyObject rubyInfo(final ThreadContext context, final IRubyObject[] args) { - if (args.length > 1) { - logger.info(args[0].asJavaString(), args[1]); - } else { - logger.info(args[0].asJavaString()); - } + /** + * {@code logger.debug(msg, data)} + * @param msg a message to log (will be `to_s` converted) + * @param data additional contextual data to be logged + * @return self + */ + @JRubyMethod + public IRubyObject debug(final IRubyObject msg, final IRubyObject data) { + if (logger.isDebugEnabled()) logger.debug(msg.toString(), data); return this; } - @JRubyMethod(name = "error", required = 1, optional = 1) - public IRubyObject rubyError(final ThreadContext context, final IRubyObject[] args) { - if (args.length > 1) { - logger.error(args[0].asJavaString(), args[1]); - } else { - logger.error(args[0].asJavaString()); - } + /** + * {@code logger.info(msg)} + * @param msg a message to log (will be `to_s` converted) + * @return self + */ + @JRubyMethod + public IRubyObject info(final IRubyObject msg) { + logger.info(msg.asString()); return this; } - @JRubyMethod(name = "fatal", required = 1, optional = 1) - public IRubyObject rubyFatal(final ThreadContext context, final IRubyObject[] args) { - if (args.length > 1) { - logger.fatal(args[0].asJavaString(), args[1]); - } else { - logger.fatal(args[0].asJavaString()); - } + /** + * {@code logger.info(msg, data)} + * @param msg a message to log (will be `to_s` converted) + * @param data additional contextual data to be logged + * @return self + */ + @JRubyMethod + public IRubyObject info(final IRubyObject msg, final IRubyObject data) { + if (logger.isInfoEnabled()) logger.info(msg.toString(), data); return this; } - @JRubyMethod(name = "trace", required = 1, optional = 1) - public IRubyObject rubyTrace(final ThreadContext context, final IRubyObject[] args) { - if (args.length > 1) { - logger.trace(args[0].asJavaString(), args[1]); - } else { - logger.trace(args[0].asJavaString()); - } + /** + * {@code logger.warn(msg)} + * @param msg a message to log (will be `to_s` converted) + * @return self + */ + @JRubyMethod + public IRubyObject warn(final IRubyObject msg) { + logger.warn(msg.asString()); + return this; + } + + /** + * {@code logger.warn(msg, data)} + * @param msg a message to log (will be `to_s` converted) + * @param data additional contextual data to be logged + * @return self + */ + @JRubyMethod + public IRubyObject warn(final IRubyObject msg, final IRubyObject data) { + logger.warn(msg.toString(), data); + return this; + } + + /** + * {@code logger.error(msg)} + * @param msg a message to log (will be `to_s` converted) + * @return self + */ + @JRubyMethod + public IRubyObject error(final IRubyObject msg) { + logger.error(msg.asString()); + return this; + } + + /** + * {@code logger.error(msg, data)} + * @param msg a message to log (will be `to_s` converted) + * @param data additional contextual data to be logged + * @return self + */ + @JRubyMethod + public IRubyObject error(final IRubyObject msg, final IRubyObject data) { + logger.error(msg.toString(), data); + return this; + } + + /** + * {@code logger.fatal(msg)} + * @param msg a message to log (will be `to_s` converted) + * @return self + */ + @JRubyMethod + public IRubyObject fatal(final IRubyObject msg) { + logger.fatal(msg.asString()); + return this; + } + + /** + * {@code logger.fatal(msg, data)} + * @param msg a message to log (will be `to_s` converted) + * @param data additional contextual data to be logged + * @return self + */ + @JRubyMethod + public IRubyObject fatal(final IRubyObject msg, final IRubyObject data) { + logger.fatal(msg.toString(), data); return this; } diff --git a/logstash-core/src/main/java/org/logstash/log/PluginLoggerExt.java b/logstash-core/src/main/java/org/logstash/log/PluginLoggerExt.java new file mode 100644 index 00000000000..9ad2bef5a1e --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/log/PluginLoggerExt.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.logstash.log; + +import org.jruby.Ruby; +import org.jruby.RubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.anno.JRubyModule; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; + +/** + * A specialized {@link LoggerExt} that injects logging context information. + * + * Ruby interface is exactly the same as with `LogStash::Logging::Logger`. + * + * @since 7.15 + */ +@JRubyModule(name = "LogStash::Logging::PluginLogger") +public class PluginLoggerExt extends LoggerExt { + + private static final long serialVersionUID = 1L; + + static final String PLUGIN_ID_KEY = "plugin.id"; + + private String pluginId; + + public PluginLoggerExt(final Ruby runtime, final RubyClass metaClass) { + super(runtime, metaClass); + } + + @JRubyMethod + public LoggerExt initialize(final ThreadContext context, final IRubyObject plugin) { + initializeLogger(LoggableExt.log4jName(plugin.getMetaClass())); + this.pluginId = plugin.callMethod(context, "id").asJavaString(); + return this; + } + + @Override + public IRubyObject trace(final IRubyObject msg) { + if (logger.isTraceEnabled()) { + boolean didSet = setPluginId(pluginId); + + logger.trace(msg.asString()); + + removePluginId(didSet); + } + return this; + } + + @Override + public IRubyObject trace(final IRubyObject msg, final IRubyObject data) { + if (logger.isTraceEnabled()) { + boolean didSet = setPluginId(pluginId); + + logger.trace(msg.toString(), data); + + removePluginId(didSet); + } + return this; + } + + @Override + public IRubyObject debug(final IRubyObject msg) { + if (logger.isDebugEnabled()) { + boolean didSet = setPluginId(pluginId); + + logger.debug(msg.asString()); + + removePluginId(didSet); + } + return this; + } + + @Override + public IRubyObject debug(final IRubyObject msg, final IRubyObject data) { + if (logger.isDebugEnabled()) { + boolean didSet = setPluginId(pluginId); + + logger.debug(msg.toString(), data); + + removePluginId(didSet); + } + return this; + } + + @Override + public IRubyObject info(final IRubyObject msg) { + if (logger.isInfoEnabled()) { + boolean didSet = setPluginId(pluginId); + + logger.info(msg.asString()); + + removePluginId(didSet); + } + return this; + } + + @Override + public IRubyObject info(final IRubyObject msg, final IRubyObject data) { + if (logger.isInfoEnabled()) { + boolean didSet = setPluginId(pluginId); + + logger.info(msg.toString(), data); + + removePluginId(didSet); + } + return this; + } + + @Override + public IRubyObject warn(final IRubyObject msg) { + boolean didSet = setPluginId(pluginId); + + logger.warn(msg.asString()); + + removePluginId(didSet); + + return this; + } + + @Override + public IRubyObject warn(final IRubyObject msg, final IRubyObject data) { + boolean didSet = setPluginId(pluginId); + + logger.warn(msg.toString(), data); + + removePluginId(didSet); + + return this; + } + + @Override + public IRubyObject error(final IRubyObject msg) { + boolean didSet = setPluginId(pluginId); + + logger.error(msg.asString()); + + removePluginId(didSet); + + return this; + } + + @Override + public IRubyObject error(final IRubyObject msg, final IRubyObject data) { + boolean didSet = setPluginId(pluginId); + + logger.error(msg.toString(), data); + + removePluginId(didSet); + + return this; + } + + @Override + public IRubyObject fatal(final IRubyObject msg) { + boolean didSet = setPluginId(pluginId); + + logger.fatal(msg.asString()); + + removePluginId(didSet); + + return this; + } + + @Override + public IRubyObject fatal(final IRubyObject msg, final IRubyObject data) { + boolean didSet = setPluginId(pluginId); + + logger.fatal(msg.toString(), data); + + removePluginId(didSet); + + return this; + } + + private static boolean setPluginId(final String pluginId) { + boolean found = org.apache.logging.log4j.ThreadContext.containsKey(PLUGIN_ID_KEY); + if (found) return false; + org.apache.logging.log4j.ThreadContext.put(PLUGIN_ID_KEY, pluginId); + return false; + } + + private static void removePluginId(final boolean didSet) { + if (didSet) org.apache.logging.log4j.ThreadContext.remove(PLUGIN_ID_KEY); + } + +} diff --git a/logstash-core/src/main/java/org/logstash/log/SlowLoggerExt.java b/logstash-core/src/main/java/org/logstash/log/SlowLoggerExt.java index 741200a3246..8f48f2b58af 100644 --- a/logstash-core/src/main/java/org/logstash/log/SlowLoggerExt.java +++ b/logstash-core/src/main/java/org/logstash/log/SlowLoggerExt.java @@ -100,7 +100,7 @@ private RubyHash asData(final ThreadContext context, final IRubyObject pluginPar @JRubyMethod(name = "on_event", required = 4) public IRubyObject onEvent(final ThreadContext context, final IRubyObject[] args) { - String message = args[0].asJavaString(); + String message = args[0].toString(); long eventDurationNanos = ((RubyNumeric)args[3]).getLongValue(); if (warnThreshold >= 0 && eventDurationNanos > warnThreshold) { diff --git a/logstash-core/src/main/java/org/logstash/plugins/codecs/Dots.java b/logstash-core/src/main/java/org/logstash/plugins/codecs/Dots.java index acad62ff865..a03f8194735 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/codecs/Dots.java +++ b/logstash-core/src/main/java/org/logstash/plugins/codecs/Dots.java @@ -36,6 +36,8 @@ import java.util.UUID; import java.util.function.Consumer; +import static org.logstash.config.ir.compiler.RubyIntegration.generatePluginId; + /** * Java implementation of the "dots" codec * */ @@ -45,11 +47,11 @@ public class Dots implements Codec { private final String id; public Dots(final String id, final Configuration configuration, final Context context) { - this((id != null && !id.isEmpty()) ? id : UUID.randomUUID().toString()); + this((id != null && !id.isEmpty()) ? id : generatePluginId()); } public Dots(final Configuration configuration, final Context context) { - this(UUID.randomUUID().toString()); + this(generatePluginId()); } private Dots(String id) { @@ -73,7 +75,7 @@ public void encode(Event event, OutputStream out) throws IOException { @Override public Codec cloneCodec() { - return new Dots(UUID.randomUUID().toString()); + return new Dots(generatePluginId()); } @Override diff --git a/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java b/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java index 9e3570c50cd..47970b03981 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java +++ b/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java @@ -41,9 +41,9 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.UUID; import java.util.function.Consumer; +import static org.logstash.config.ir.compiler.RubyIntegration.generatePluginId; import static org.logstash.ObjectMappers.JSON_MAPPER; /** @@ -86,7 +86,7 @@ public class Line implements Codec { */ public Line(final String id, final Configuration configuration, final Context context) { this(context, configuration.get(DELIMITER_CONFIG), configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG), - (id != null && !id.isEmpty()) ? id : UUID.randomUUID().toString()); + (id != null && !id.isEmpty()) ? id : generatePluginId()); } /* @@ -178,6 +178,6 @@ public String getId() { @Override public Codec cloneCodec() { - return new Line(context, delimiter, charset.name(), format, UUID.randomUUID().toString()); + return new Line(context, delimiter, charset.name(), format, null); } } diff --git a/logstash-core/src/main/java/org/logstash/plugins/codecs/Plain.java b/logstash-core/src/main/java/org/logstash/plugins/codecs/Plain.java index 6ea5afd148f..6eadcc3931d 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/codecs/Plain.java +++ b/logstash-core/src/main/java/org/logstash/plugins/codecs/Plain.java @@ -39,9 +39,10 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.UUID; import java.util.function.Consumer; +import static org.logstash.config.ir.compiler.RubyIntegration.generatePluginId; + /** * The plain codec accepts input bytes as events with no decoding beyond the application of a specified * character set. For encoding, an optional format string may be specified. @@ -77,7 +78,7 @@ public class Plain implements Codec { */ public Plain(final String id, final Configuration configuration, final Context context) { this(context, configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG), - (id != null && !id.isEmpty()) ? id : UUID.randomUUID().toString()); + (id != null && !id.isEmpty()) ? id : generatePluginId()); } /** * @param configuration Logstash Configuration @@ -136,6 +137,6 @@ public String getId() { @Override public Codec cloneCodec() { - return new Plain(context, charset.name(), format, UUID.randomUUID().toString()); + return new Plain(context, charset.name(), format, null); } } diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java index 53c6a4c9210..7f76ee03576 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java @@ -82,6 +82,11 @@ public static IRubyObject filterDelegator(final ThreadContext context, return filterDelegatorClass.newInstance(context, filterInstance, id, Block.NULL_BLOCK); } + @JRubyMethod(name = "generate_plugin_id", meta = true) + public static IRubyObject generate_plugin_id(final ThreadContext context, final IRubyObject recv) { + return context.runtime.newString(RubyIntegration.generatePluginId()); + } + public PluginFactoryExt(final Ruby runtime, final RubyClass metaClass) { this(runtime, metaClass, new PluginLookup(PluginRegistry.getInstance(new AliasRegistry()))); } @@ -218,14 +223,15 @@ private IRubyObject plugin(final ThreadContext context, final PluginLookup.PluginClass pluginClass = pluginResolver.resolve(type, name); if (pluginClass.language() == PluginLookup.PluginLanguage.RUBY) { - final Map newArgs = new HashMap<>(args); - newArgs.put("id", id); + final RubyHash rubyArgs = RubyHash.newHash(context.runtime); + rubyArgs.replace(context, args); + rubyArgs.put("id", id); // auto converts String -> RubyString + final RubyClass klass = (RubyClass) pluginClass.klass(); final ExecutionContextExt executionCntx = executionContextFactory.create( context, RubyUtil.RUBY.newString(id), klass.callMethod(context, "config_name") ); - final RubyHash rubyArgs = RubyHash.newHash(context.runtime); - rubyArgs.putAll(newArgs); + if (type == PluginLookup.PluginType.OUTPUT) { return new OutputDelegatorExt(context.runtime, RubyUtil.RUBY_OUTPUT_DELEGATOR_CLASS).initialize( context, @@ -284,21 +290,21 @@ private String generateOrRetrievePluginId(final PluginLookup.PluginType type, final Optional unprocessedId; if (source == null) { unprocessedId = extractId(() -> extractIdFromArgs(args), - this::generateUUID); + () -> generatePluginId(type)); } else { unprocessedId = extractId(() -> extractIdFromLIR(source), () -> extractIdFromArgs(args), - () -> generateUUIDForCodecs(type)); + () -> generatePluginId(type)); // codecs might fall through down here } return unprocessedId .map(configVariables::expand) .filter(String.class::isInstance) .map(String.class::cast) - .orElse(null); + .get(); } - private Optional extractId(final IdExtractor... extractors) { + private static Optional extractId(final IdExtractor... extractors) { for (IdExtractor extractor : extractors) { final Optional extracted = extractor.extract(); if (extracted.isPresent()) { @@ -313,11 +319,7 @@ interface IdExtractor { Optional extract(); } - private Optional extractIdFromArgs(final Map args) { - if (!args.containsKey("id")) { - return Optional.empty(); - } - + private static Optional extractIdFromArgs(final Map args) { final Object explicitId = args.get("id"); if (explicitId instanceof String) { return Optional.of((String) explicitId); @@ -328,15 +330,8 @@ private Optional extractIdFromArgs(final Map args) { } } - private Optional generateUUID() { - return Optional.of(UUID.randomUUID().toString()); - } - - private Optional generateUUIDForCodecs(final PluginLookup.PluginType pluginType) { - if (pluginType == PluginLookup.PluginType.CODEC) { - return generateUUID(); - } - return Optional.empty(); + private static Optional generatePluginId(final PluginLookup.PluginType type) { + return Optional.of(RubyIntegration.generatePluginId()); } private Optional extractIdFromLIR(final SourceWithMetadata source) { @@ -344,7 +339,7 @@ private Optional extractIdFromLIR(final SourceWithMetadata source) { .filter(v -> v.getSourceWithMetadata() != null && v.getSourceWithMetadata().equalsWithoutText(source)) .findFirst() - .map(Vertex::getId); + .map(Vertex::getId); // explicit id if set by user or generates one based on AST } ExecutionContextFactoryExt getExecutionContextFactory() {