From 75ad3bd3652049a2b198595cfeaa55bad7394e1f Mon Sep 17 00:00:00 2001 From: Jiri Kadlcik Date: Fri, 3 Feb 2017 22:58:31 +0100 Subject: [PATCH] Fix issue #6 - use missing calls from Plugin API - stop, close, stop? --- CHANGELOG.md | 1 + lib/logstash/inputs/jms.rb | 51 +++++++++++++++++++++----------------- logstash-input-jms.gemspec | 5 ++-- 3 files changed, 31 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e47122a..78c1439 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ - Breaking: Updated plugin to use new Java Event APIs - relax logstash-core-plugin-api constrains - update .travis.yml + - fix issue #6 - use missing calls from Plugin API - stop, close, stop? # 2.0.4 - Depend on logstash-core-plugin-api instead of logstash-core, removing the need to mass update plugins on major releases of logstash diff --git a/lib/logstash/inputs/jms.rb b/lib/logstash/inputs/jms.rb index 4b1c928..31588bd 100644 --- a/lib/logstash/inputs/jms.rb +++ b/lib/logstash/inputs/jms.rb @@ -7,14 +7,15 @@ # # For more information about Jms, see # For more information about the Ruby Gem used, see -# Here is a config example : +# Here is a config example to pull from a queue: # jms { # include_header => false # include_properties => false # include_body => true # use_jms_timestamp => false # interval => 10 -# queue_name => "myqueue" +# destination => "myqueue" +# pub-sub => false # yaml_file => "~/jms.yml" # yaml_section => "mybroker" # } @@ -57,7 +58,7 @@ class LogStash::Inputs::Jms < LogStash::Inputs::Threadable # This parameter has non influence in the case of a subcribed Topic. config :interval, :validate => :number, :default => 10 - # If pub-sub (topic) style should be used or not. + # If pub-sub (topic) style should be used. config :pub_sub, :validate => :boolean, :default => false # Name of the destination queue or topic to use. @@ -137,7 +138,7 @@ def queue_event(msg, output_queue) if msg.java_kind_of?(JMS::MapMessage) event = LogStash::Event.new msg.data.each do |field, value| - event[field.to_s] = value # TODO(claveau): needs codec.decode or converter.convert ? + event.set(field.to_s, value) # TODO(claveau): needs codec.decode or converter.convert ? end elsif msg.java_kind_of?(JMS::TextMessage) || msg.java_kind_of?(JMS::BytesMessage) @codec.decode(msg.to_s) do |event_message| @@ -152,7 +153,7 @@ def queue_event(msg, output_queue) # Here, we can use the JMS Enqueue timestamp as the @timestamp if @use_jms_timestamp && msg.jms_timestamp - event.timestamp = LogStash::Timestamp.at(msg.jms_timestamp / 1000, (msg.jms_timestamp % 1000) * 1000) + event.set("@timestamp", LogStash::Timestamp.at(msg.jms_timestamp / 1000, (msg.jms_timestamp % 1000) * 1000)) end if @include_header @@ -182,20 +183,19 @@ def queue_event(msg, output_queue) def run_consumer(output_queue) JMS::Connection.session(@jms_config) do |session| destination_key = @pub_sub ? :topic_name : :queue_name - while(true) + while !stop? session.consume(destination_key => @destination, :timeout=>@timeout, :selector => @selector) do |message| queue_event message, output_queue + break if stop? end sleep @interval end end - rescue LogStash::ShutdownSignal - # Do nothing, let us quit. rescue => e @logger.warn("JMS Consumer died", :exception => e, :backtrace => e.backtrace) sleep(10) - retry - end # def run + retry unless stop? + end # def run_consumer # Consume all available messages on the queue through a listener private @@ -210,19 +210,15 @@ def run_thread(output_queue) queue_event message, output_queue end connection.start - while(true) + while !stop? @logger.debug("JMS Thread sleeping ...") sleep @interval end - rescue LogStash::ShutdownSignal - connection.close rescue => e @logger.warn("JMS Consumer died", :exception => e, :backtrace => e.backtrace) sleep(10) - retry - end # def run - - + retry unless stop? + end # def run_thread # Consume all available messages on the queue through a listener private @@ -241,18 +237,16 @@ def run_async(output_queue) end # Since the on_message handler above is in a separate thread the thread needs # to do some other work. It will just sleep for 10 seconds. - while(true) + while !stop? + @logger.debug("JMS Thread sleeping ...") sleep @interval end end - rescue LogStash::ShutdownSignal - # Do nothing, let us quit. rescue => e @logger.warn("JMS Consumer died", :exception => e, :backtrace => e.backtrace) sleep(10) - retry - end # def run - + retry unless stop? + end # def run_async public def run(output_queue) @@ -266,4 +260,15 @@ def run(output_queue) end end # def run + public + def close + @logger.info("Closing JMS connection") + @connection.close rescue nil + end # def close + + public + def stop + @logger.info("Stopping JMS consumer") + @connection.stop rescue nil + end # def stop end # class LogStash::Inputs::Jms diff --git a/logstash-input-jms.gemspec b/logstash-input-jms.gemspec index 9a6aae7..36732db 100644 --- a/logstash-input-jms.gemspec +++ b/logstash-input-jms.gemspec @@ -21,9 +21,8 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" - - s.add_runtime_dependency 'logstash-codec-json' - s.add_runtime_dependency 'logstash-codec-plain' + s.add_runtime_dependency 'logstash-codec-json', '~> 3.0' + s.add_runtime_dependency 'logstash-codec-plain', '~> 3.0' if RUBY_PLATFORM == 'java' s.platform = RUBY_PLATFORM