Skip to content

Commit

Permalink
Fix issue logstash-plugins#6 - use missing calls from Plugin API - st…
Browse files Browse the repository at this point in the history
…op, close, stop?
  • Loading branch information
AgentW4C committed Feb 3, 2017
1 parent e26df4d commit 75ad3bd
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
- Breaking: Updated plugin to use new Java Event APIs
- relax logstash-core-plugin-api constrains
- update .travis.yml
- fix issue #6 - use missing calls from Plugin API - stop, close, stop?

# 2.0.4
- Depend on logstash-core-plugin-api instead of logstash-core, removing the need to mass update plugins on major releases of logstash
Expand Down
51 changes: 28 additions & 23 deletions lib/logstash/inputs/jms.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
#
# For more information about Jms, see <http://docs.oracle.com/javaee/6/tutorial/doc/bncdq.html>
# For more information about the Ruby Gem used, see <http://github.com/reidmorrison/jruby-jms>
# Here is a config example :
# Here is a config example to pull from a queue:
# jms {
# include_header => false
# include_properties => false
# include_body => true
# use_jms_timestamp => false
# interval => 10
# queue_name => "myqueue"
# destination => "myqueue"
# pub-sub => false
# yaml_file => "~/jms.yml"
# yaml_section => "mybroker"
# }
Expand Down Expand Up @@ -57,7 +58,7 @@ class LogStash::Inputs::Jms < LogStash::Inputs::Threadable
# This parameter has non influence in the case of a subcribed Topic.
config :interval, :validate => :number, :default => 10

# If pub-sub (topic) style should be used or not.
# If pub-sub (topic) style should be used.
config :pub_sub, :validate => :boolean, :default => false

# Name of the destination queue or topic to use.
Expand Down Expand Up @@ -137,7 +138,7 @@ def queue_event(msg, output_queue)
if msg.java_kind_of?(JMS::MapMessage)
event = LogStash::Event.new
msg.data.each do |field, value|
event[field.to_s] = value # TODO(claveau): needs codec.decode or converter.convert ?
event.set(field.to_s, value) # TODO(claveau): needs codec.decode or converter.convert ?
end
elsif msg.java_kind_of?(JMS::TextMessage) || msg.java_kind_of?(JMS::BytesMessage)
@codec.decode(msg.to_s) do |event_message|
Expand All @@ -152,7 +153,7 @@ def queue_event(msg, output_queue)

# Here, we can use the JMS Enqueue timestamp as the @timestamp
if @use_jms_timestamp && msg.jms_timestamp
event.timestamp = LogStash::Timestamp.at(msg.jms_timestamp / 1000, (msg.jms_timestamp % 1000) * 1000)
event.set("@timestamp", LogStash::Timestamp.at(msg.jms_timestamp / 1000, (msg.jms_timestamp % 1000) * 1000))
end

if @include_header
Expand Down Expand Up @@ -182,20 +183,19 @@ def queue_event(msg, output_queue)
def run_consumer(output_queue)
JMS::Connection.session(@jms_config) do |session|
destination_key = @pub_sub ? :topic_name : :queue_name
while(true)
while !stop?
session.consume(destination_key => @destination, :timeout=>@timeout, :selector => @selector) do |message|
queue_event message, output_queue
break if stop?
end
sleep @interval
end
end
rescue LogStash::ShutdownSignal
# Do nothing, let us quit.
rescue => e
@logger.warn("JMS Consumer died", :exception => e, :backtrace => e.backtrace)
sleep(10)
retry
end # def run
retry unless stop?
end # def run_consumer

# Consume all available messages on the queue through a listener
private
Expand All @@ -210,19 +210,15 @@ def run_thread(output_queue)
queue_event message, output_queue
end
connection.start
while(true)
while !stop?
@logger.debug("JMS Thread sleeping ...")
sleep @interval
end
rescue LogStash::ShutdownSignal
connection.close
rescue => e
@logger.warn("JMS Consumer died", :exception => e, :backtrace => e.backtrace)
sleep(10)
retry
end # def run


retry unless stop?
end # def run_thread

# Consume all available messages on the queue through a listener
private
Expand All @@ -241,18 +237,16 @@ def run_async(output_queue)
end
# Since the on_message handler above is in a separate thread the thread needs
# to do some other work. It will just sleep for 10 seconds.
while(true)
while !stop?
@logger.debug("JMS Thread sleeping ...")
sleep @interval
end
end
rescue LogStash::ShutdownSignal
# Do nothing, let us quit.
rescue => e
@logger.warn("JMS Consumer died", :exception => e, :backtrace => e.backtrace)
sleep(10)
retry
end # def run

retry unless stop?
end # def run_async

public
def run(output_queue)
Expand All @@ -266,4 +260,15 @@ def run(output_queue)
end
end # def run

public
def close
@logger.info("Closing JMS connection")
@connection.close rescue nil
end # def close

public
def stop
@logger.info("Stopping JMS consumer")
@connection.stop rescue nil
end # def stop
end # class LogStash::Inputs::Jms
5 changes: 2 additions & 3 deletions logstash-input-jms.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"

s.add_runtime_dependency 'logstash-codec-json'
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'logstash-codec-json', '~> 3.0'
s.add_runtime_dependency 'logstash-codec-plain', '~> 3.0'

if RUBY_PLATFORM == 'java'
s.platform = RUBY_PLATFORM
Expand Down

0 comments on commit 75ad3bd

Please sign in to comment.