From 8e7cc1b55ce0df34b2a6f7fc3f469b9f79c3e75d Mon Sep 17 00:00:00 2001 From: Andrius Kairiukstis Date: Wed, 7 Aug 2019 12:07:46 +0200 Subject: [PATCH] Final (almost) version of AMI client --- spec/ami_hooks_spec.cr | 35 ++++++ spec/ami_loadtest_spec.cr | 19 +++- spec/ami_spec.cr | 26 +++++ spec/asterisk_connection_spec.cr | 2 +- src/asterisk/ami.cr | 188 +++++++++++++++++-------------- src/asterisk/ami/receiver.cr | 55 +++++---- 6 files changed, 214 insertions(+), 111 deletions(-) create mode 100644 spec/ami_hooks_spec.cr diff --git a/spec/ami_hooks_spec.cr b/spec/ami_hooks_spec.cr new file mode 100644 index 0000000..1336a6c --- /dev/null +++ b/spec/ami_hooks_spec.cr @@ -0,0 +1,35 @@ +require "./spec_helper" + +describe Asterisk::AMI do + # Testing basic actions + describe "#on_hooks" do + it "should trigger 'FullyBooted' callback" do + # can't use with_ami wrapper: FullyBooted aproaching next after login + ami = Asterisk::AMI.new username: "asterisk.cr", secret: "asterisk.cr" + + ami.on_event("FullyBooted") do |ami, event| + event.event.should eq("FullyBooted") + event["status"].should match /Fully Booted/i + # runner already processed "FullyBooted" and ami.connected? expected to + # be true + # pp ami + # ami.connected?.should be_true + end + + ami.login + # callback will be triggered by AMI right after login action + sleep 0.2 + ami.logoff + end + + it "should trigger 'on_close' callback" do + with_ami do |ami| + ami.on_close do |ami| + # AMI connection should be closed already + ami.connected?.should be_false + end + end + end + + end +end diff --git a/spec/ami_loadtest_spec.cr b/spec/ami_loadtest_spec.cr index da2f9d2..84e5216 100644 --- a/spec/ami_loadtest_spec.cr +++ b/spec/ami_loadtest_spec.cr @@ -4,9 +4,18 @@ describe Asterisk::AMI do it "should correctly respond to all the invoked events within heavy loaded Asterisk" do # # less noicy, please # Asterisk.logger.level = Logger::ERROR + + # consider increasing expects_answer_before timeout for higher value of + # fibers_count, CPU load could be high + expects_answer_before = 1.0 + + # how many parallel AMI connections should be tested fibers_count = 10 - test_loops_count = 500 fibers = Channel(Nil).new(fibers_count) + + # how much loops of tests to execute + test_loops_count = 500 + fibers_count.times do |spawn_no| spawn_no_pretty = "0000#{spawn_no}"[-4, 4] foobar = "foobar_#{spawn_no_pretty}" @@ -29,7 +38,7 @@ describe Asterisk::AMI do response.message.should match /Variable Set/i actionid = Random::Secure.hex(8) - response = ami.send_action({"action" => "SIPpeers", "actionid" => actionid}, expects_answer_before: 0.5) + response = ami.send_action({"action" => "SIPpeers", "actionid" => actionid}, expects_answer_before: expects_answer_before) response.success?.should be_true response.actionid.should eq(actionid) response["eventlist"].should match /^start$/i @@ -65,10 +74,12 @@ describe Asterisk::AMI do # short random pause after loop to randomize fibers data sleep 0.002 + rand(0.05) + + rescue ex + puts %(\n\nAMI loadtest spec: #{ex.class}:#{ex.message}\n#{ex.backtrace.pretty_inspect}\n\nLatest response: #{response.inspect rescue "-- n/a ---"}\n\n) + break end # test loop - rescue ex - puts "\n\nAMI loadtest spec: #{ex.class}:#{ex.message}\n#{ex.backtrace.pretty_inspect}\n\n" ensure fibers.send nil end # with_ami diff --git a/spec/ami_spec.cr b/spec/ami_spec.cr index 1706427..235ffbe 100644 --- a/spec/ami_spec.cr +++ b/spec/ami_spec.cr @@ -3,6 +3,14 @@ require "./spec_helper" describe Asterisk::AMI do # Testing basic actions describe "#basic_actions" do + it "after login, client should set ami_version, asterisk_version and asterisk_platform" do + with_ami do |ami| + ami.ami_version.should_not be_nil + ami.asterisk_version.should_not be_nil + ami.asterisk_platform.should_not be_nil + end + end + it "should respond with 'Pong' to action 'ping'" do with_ami do |ami| 10.times do |i| @@ -123,5 +131,23 @@ describe Asterisk::AMI do end end + it "ami method 'command' should return expected result" do + with_ami do |ami| + # returns one line + ami.command("core show version") =~ /Asterisk (\d{1,2}.\d{1,2}.\d{1,2}).+on a (\S+)/ + asterisk_version = $1 + asterisk_platform = $2 + asterisk_version.should be_a(String) + asterisk_platform.should be_a(String) + + # returns array (two lines) + ami.command("core show uptime").as(Array(String)).join("\n") =~ /(?|uptime: (.+)|reload: (.+))/i + uptime = $1 + last_reload = $1 + uptime.should be_a(String) + last_reload.should be_a(String) + end + end + end end diff --git a/spec/asterisk_connection_spec.cr b/spec/asterisk_connection_spec.cr index f20a745..1ed5351 100644 --- a/spec/asterisk_connection_spec.cr +++ b/spec/asterisk_connection_spec.cr @@ -16,7 +16,7 @@ describe Asterisk::Server do end it "after start, Asterisk Manager should listen on 127.0.0.1:5038" do - Asterisk::Server.port_is_open?("5038").should be_true + Asterisk::Server.port_open?("5038").should be_true end it "should stop by 'core stop now' CLI command" do diff --git a/src/asterisk/ami.cr b/src/asterisk/ami.cr index 6bb2f54..2193e1f 100644 --- a/src/asterisk/ami.cr +++ b/src/asterisk/ami.cr @@ -5,17 +5,21 @@ require "./ami/*" module Asterisk class AMI - getter logger : Logger = Asterisk.logger - - @conn = TCPSocket.new - @running = false - @fully_booted = false - - getter receiver : Receiver = Receiver.new - + alias EventName = String alias ActionID = String alias AMIData = Hash(String, String | Array(String)) + @conn = TCPSocket.new + @connected = false + @running = false + @fully_booted = false + @event_callbacks = Hash(EventName, Proc(AMI, Event, Nil)).new + getter logger : Logger = Asterisk.logger + getter receiver : Receiver = Receiver.new(logger: logger) + getter ami_version : String? + getter asterisk_version : String? + getter asterisk_platform : String? + class LoginError < Exception end @@ -25,38 +29,59 @@ module Asterisk class ConnectionError < Exception end + # close client and raise error private def raise(ex : Exception) - # close everything and raise error - logoff! - ensure + close ::raise ex end + # on_close callback + def on_close(&@on_close : AMI ->) + end + + # on_event callback (event name, AMI instance, event body) + def on_event(event : EventName, &block : AMI, Event ->) + @event_callbacks[event.to_s.downcase] = block + end + def initialize(@host = "127.0.0.1", @port = "5038", @username = "", @secret = "", @logger : Logger = Asterisk.logger) end + def connected? + @connected && running? && fully_booted? + end + def login + raise LoginError.new("Already connected, logoff first") if @connected @conn = TCPSocket.new(@host, @port) @conn.sync = true @conn.keepalive = false run response = send_action({"action" => "Login", "username" => @username, "secret" => @secret}) - logger.debug "#{self.class}.login response: #{response}" if response.success? - # running but FullyBooted event shall be also processed + # {"unknown" => "Asterisk Call Manager/2.10.5", + # "response" => "Success", + # "message" => "Authentication accepted"} + @ami_version = response["unknown"].as(String).split("/").last + @connected = true + # AMI should enqueue FullyBooted event that will be processed by runner sleep 0.03 unless fully_booted? - raise NotBootedError.new("Asterisk did not respond with FullyBooted event") + raise NotBootedError.new("After logn, AMI shoud respond with FullyBooted event") end + # last thing, get asterisk version + version_information = command("core show version") + version_information =~ /Asterisk (\d{1,2}.\d{1,2}.\d{1,2}).+on a (\S+)/ + @asterisk_version = $1 + @asterisk_platform = $2 + logger.debug "#{self.class}.login: Logged in" else raise LoginError.new(response.message) end end def logoff - logger.debug "#{self.class}.logoff: Preparing" if running? - logger.debug "#{self.class}.logoff: Logging off" response = send_action({"action" => "Logoff"}) # {"response" => "Goodbye", "message" => "Thanks for all the fish."} if response.response == "Goodbye" @@ -65,101 +90,98 @@ module Asterisk logger.error "#{self.class}.logoff: Logged off with incorrect response: #{response}" end end - ensure - logoff! + close end - private def logoff! - @running = false - @fully_booted = false - @conn.close - logger.debug "#{self.class}.logoff!: Disconnected!" + def command(command : String) : String | Array(String) + result = send_action({"action" => "Command", "command" => command}).output + if result.size == 1 + result.first + else + result + end end - def send_action(action : AMIData, expects_answer_before : Float64 = 0.0) + # increase expects_answer_before with heavy loaded CPU + def send_action(action : AMIData, expects_answer_before : Float64 = 0.3) actionid = action["actionid"] ||= UUID.random.to_s - @receiver = Receiver.new(actionid: actionid, expects_answer_before: expects_answer_before, logger: logger) - response = receiver.get do - send!(action) - logger.debug "#{self.class}.send_action: sending #{action}" - logger.debug "#{self.class}.send_action: sent, waiting for response" + @receiver = Receiver.new(logger: logger) + response = receiver.get(actionid: actionid, expects_answer_before: expects_answer_before) do + conn_send(action) end logger.debug "#{self.class}.send_action: response received: #{response.inspect}" response end + # Format action as a multiline string delimited by "\r\n" and send it + # through AMI TCPSocket connection + private def conn_send(action : AMIData) + # Asterisk AMI action is a multi-line string delimited by "\r\n" following + # with one empty strring + action_s = "" + action.each do |k, v| + action_s += "#{k}: #{v}\r\n" + end + action_s += "\r\n" + @conn << action_s + rescue ex + raise ex + end + private def run - raise LoginError.new("Already running!") if running? - running! + @running = true spawn do logger.debug "#{self.class}.run: Starting" while running? - io_data = read! - # logger.debug "#{self.class}.run: <<< AMI data received: #{data}" + io_data = conn_read data = format(io_data) logger.debug "#{self.class}.run: Formatted data: #{data.inspect}" + # Are asterisk get terminated elsewhere? + # (empty strings are coming to the AMI interface in some cases of + # forced process termination; in such case Asterisk does not send + # "Shutdown" event + close if data.to_h == {"unknown" => ""} + if receiver.waiting? # received message is an AMI unstructured (text) information # message that comes as response right after actin was send OR # that's response of action containing same actionid as receiver - if ! data.is_a?(Event) && data.actionid?.nil? && ! data.response_present? + if !data.is_a?(Event) && data.actionid?.nil? && !data.response_present? receiver.send data - logger.debug "#{self.class}.run: <<< sending response: #{data.inspect} to receiver: #{receiver.inspect}" + # logger.debug "#{self.class}.run: <<< sending response: #{data.inspect} to receiver: #{receiver.inspect}" elsif data.actionid_present? && data.actionid == receiver.actionid receiver.send data - logger.debug "#{self.class}.run: <<< sending response: #{data.inspect} to receiver: #{receiver.inspect}" + # logger.debug "#{self.class}.run: <<< sending response: #{data.inspect} to receiver: #{receiver.inspect}" end end if data.is_a?(Event) - # do something with data, process hooks etc! - # here... TODO... - # FullyBooted event raised by AMI when all Asterisk initialization # procedures have finished. - fully_booted! if data.event == "FullyBooted" + @fully_booted = true if data.event == "FullyBooted" # Does asterisk get terminated elsewhere? - logoff! if data.event == "Shutdown" - end + close if data.event == "Shutdown" - # Does asterisk get terminated elsewhere? - # (empty strings are coming to the AMI interface in some cases of - # forced process termination; in such case Asterisk does not send - # "Shutdown" event - logoff! if data.to_h == {"unknown" => ""} + # do something with data, process hooks etc! + trigger_callback data + end end - logger.debug "#{self.class}.run: Connection gone, login again!" - end - end - - # Format action as a multiline string delimited by "\r\n" and send it - # through AMI TCPSocket connection - private def send!(action : AMIData) - multiline_string = "" - action.each do |k, v| - multiline_string += "#{k}: #{v}\r\n" end - # ending string - multiline_string += "\r\n" - # send! TODO: rescue errors - @conn << multiline_string - rescue ex - raise ex end - # Read data from AMI. Usually it's an AMI event, that could be formatted as - # a json/hash, but it could be also an confirmation to the past action both - # as a hash or as a string. - # Data, that AMi returns is a set of a single or multiple strings - # delimitered by "\r\n" at the end one more terminating ("\r\n") - private def read! : String - @conn.gets("\r\n\r\n").to_s + # Read AMI data, which is event, or response/confirmation to the enqueued + # action. AMI always return data as a set of multiple strings + # delimitered by "\r\n" with one empty string at the end ("\r\n\r\n") + private def conn_read : String + data = @conn.gets("\r\n\r\n").to_s + # logger.debug "#{self.class}.conn_read: <<< AMI data received: #{data}" + data rescue IO::Timeout raise ConnectionError.new("TCPSocket timeout error") rescue ex - # Errno Bad file descriptor could come after connection get closed + # Connection error triggered after @conn.close could be ignored if running? raise ex else @@ -192,7 +214,7 @@ module Asterisk # Logic for # ```{"action" => "Command", "command" => "..."}``` if cli_command && previous_key == "actionid" - result["output"] = line.gsub(/--END COMMAND--$/, "").split("\n") + result["output"] = line.gsub(/--END COMMAND--$/, "").chomp.split("\n") break end @@ -238,20 +260,22 @@ module Asterisk end end - def connected? - running? && fully_booted? + private def trigger_callback(event : Event) + name = event.event.to_s.downcase + @event_callbacks[name]?.try &.call(self, event) end - private def running? - @running - end - - private def running! - @running = true + private def close + return unless running? + @connected = false + @running = false + @fully_booted = false + @conn.close + @on_close.try &.call(self) end - private def fully_booted! - @fully_booted = true + private def running? + @running end private def fully_booted? diff --git a/src/asterisk/ami/receiver.cr b/src/asterisk/ami/receiver.cr index ade4c58..3f18c76 100644 --- a/src/asterisk/ami/receiver.cr +++ b/src/asterisk/ami/receiver.cr @@ -7,55 +7,62 @@ module Asterisk class Receiver WAIT_FOR_ANSWER = 0.001 - property actionid : ActionID? - getter expects_answer_before : Float64 + getter actionid : ActionID? + getter expects_answer_before : Float64 = 0.3 getter logger : Logger getter response_channel : Channel::Unbuffered(Response) - def initialize(@actionid : ActionID? = nil, @expects_answer_before : Float64 = 0.0, @logger : Logger = Asterisk.logger) + def initialize(@logger : Logger = Asterisk.logger) # start with closed Receiver @response_channel = Channel::Unbuffered(Response).new - response_channel.close + close + end + + def send(data : Response | Event) + response_channel.send data end - def get : Response + def get(@actionid : ActionID, @expects_answer_before : Float64 = 0.3) : Response response = begin @response_channel = Channel::Unbuffered(Response).new yield - stop_after expects_answer_before + close_after expects_answer_before response_channel.receive rescue Channel::ClosedError - Response.new({"response" => "Error", "message" => "Timeout error while waiting for AMI response"}) + Response.new({"response" => "Error", "message" => "Timeout while waiting for AMI response", "expects_answer_before" => expects_answer_before.to_s}) end ensure - terminate! + close logger.debug "#{self.class}.get: received #{response.inspect}" response end - private def stop_after(expects_answer_before : Float64) - if expects_answer_before > 0.0 - spawn do - started_at = Time.now - while (Time.now - started_at).to_f < expects_answer_before - sleep WAIT_FOR_ANSWER - end - terminate! - logger.debug "#{self.class}.stop_after: terminated" - end - end + def closed? + response_channel.closed? end - def send(data : Response) - response_channel.send data + # if Receiver instance is open, AMI runner might send response or event to + # the response_channel + def waiting? + ! closed? end - def waiting? - ! response_channel.closed? + # close response_channel after given timeout + private def close_after(timeout : Float64) + timeout = 0.3 if timeout < 0.3 + spawn do + started_at = Time.now + while (Time.now - started_at).to_f < timeout + sleep WAIT_FOR_ANSWER + end + close + end end - def terminate! + # close response_channel + private def close + return if closed? @actionid = nil response_channel.close sleep 0.001