diff --git a/example/ami_test.go b/example/ami_test.go index 30b212c..4478707 100644 --- a/example/ami_test.go +++ b/example/ami_test.go @@ -6,6 +6,9 @@ import ( "github.com/pnguyen215/voipkit/pkg/ami" ) -func TestLogger(t *testing.T) { - ami.D().Info("Test information: %v", 1) +func TestAmiClient(t *testing.T) { + c := ami.GetAmiClientSample() + ami.D().Info("ami client request: %v", c.String()) + + ami.NewClient(ami.NewTcp(), *c) } diff --git a/pkg/ami/ami.go b/pkg/ami/ami.go index 71e4dd1..371b333 100644 --- a/pkg/ami/ami.go +++ b/pkg/ami/ami.go @@ -1,60 +1,87 @@ package ami import ( + "bufio" "context" "net" + "net/textproto" "strings" "time" "github.com/pnguyen215/voipkit/pkg/ami/config" ) -// NewAmi connect to Asterisk Server using net connection and try to login -func NewAmi(host string, port int, username, password string) (*AMI, error) { - conn, err := OnTcpConn(host, port) - if err != nil { - return nil, err - } - return NewAmiDial(conn, username, password) +func (c *AMI) EmitError(err error) { + go func(err error) { + c.Mutex.Lock() + defer c.Mutex.Unlock() + if err == nil || c.Err == nil { + return + } + // c.mu.Lock() + // defer c.mu.Unlock() + c.Err <- err + }(err) } -// NewAmiDial connect to Asterisk Server using net connection and try to login -func NewAmiDial(conn net.Conn, username, password string) (*AMI, error) { - return NewAmiWith(conn, username, password, config.NetworkTimeoutAfterSeconds) +// Error returns channel for error and signals that client should be probably restarted +func (c *AMI) Error() <-chan error { + c.Mutex.RLock() + defer c.Mutex.RUnlock() + return c.Err } -// NewAmi connect to Asterisk Server using net connection and try to login -func NewAmiWithTimeout(host string, port int, username, password string, timeout time.Duration) (*AMI, error) { - conn, err := OnTcpConn(host, port) +// Close client and destroy all subscriptions to events and action responses +func (c *AMI) Close() { + c.Mutex.Lock() + defer c.Mutex.Unlock() + c.Cancel() + c.Subs.Destroy() + c.Conn.Close() + close(c.Err) + c.Err = nil +} + +// Action send AMI message to Asterisk server and returns send-only +// response channel on nil +func (c *AMI) Action(message *AMIMessage) bool { + if message.GetActionId() == "" { + message.AddActionId() + } + err := c.write(message.Bytes()) if err != nil { - return nil, err + c.EmitError(err) + return false } - return NewAmiWith(conn, username, password, timeout) + return true } -// NewAmiWith connect to Asterisk Server using net connection and try to login -// using username, password and timeout after seconds. Create new Asterisk Manager Interface (AMI) and return client or error -func NewAmiWith(conn net.Conn, username, password string, timeout time.Duration) (*AMI, error) { - client, ctx := OnConnContext(conn) - - err := client.ReadPrompt(ctx, timeout) +// AllEvents subscribes to any AMI message received from Asterisk server +// returns send-only channel or nil +func (c *AMI) AllEvents() <-chan *AMIMessage { + return c.Subs.Subscribe(config.AmiPubSubKeyRef) +} - if err != nil { - return nil, err - } +// OnEvent subscribes by event name (case insensitive) and +// returns send-only channel or nil +func (c *AMI) OnEvent(name string) <-chan *AMIMessage { + return c.Subs.Subscribe(name) +} - err = client.Login(ctx, timeout, username, password) +// OnEvents subscribes by events name (case insensitive) and +// return send-only channel or nil +func (c *AMI) OnEvents(keys ...string) <-chan *AMIMessage { + return c.Subs.Subscribes(keys...) +} - if err != nil { - return nil, err +func (c *AMI) publish(message *AMIMessage) { + if message != nil { + c.Subs.Publish(message) } - - client.SetReader(ctx) - return client, nil } -func (c *AMI) ReadPrompt(parentCtx context.Context, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(parentCtx, timeout) +func (c *AMI) apply(_ctx context.Context, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(_ctx, timeout) defer cancel() reader := func() (chan string, chan error) { prompt := make(chan string) @@ -76,194 +103,160 @@ func (c *AMI) ReadPrompt(parentCtx context.Context, timeout time.Duration) error case <-ctx.Done(): return ErrorAsteriskConnTimeout case err := <-fail: - return ErrorAsteriskNetwork.AMIError(err.Error()) + return ErrorAsteriskNetwork.ErrorWrap(err.Error()) case promptLine := <-prompt: if !strings.HasPrefix(promptLine, config.AmiCallManagerKey) { - return ErrorAsteriskInvalidPrompt.AMIError(promptLine) + return ErrorAsteriskInvalidPrompt.ErrorWrap(promptLine) } } return nil } -func (c *AMI) Read() (*AMIMessage, error) { +func (c *AMI) read() (*AMIMessage, error) { headers, err := c.Reader.ReadMIMEHeader() - if err != nil { if err.Error() == ErrorEOF || err.Error() == ErrorIO { return nil, ErrorAsteriskNetwork } return nil, err } - message := ofMessage(headers) - // add callback value c.Raw = message return message, nil } -func (c *AMI) Write(bytes []byte) error { +func (c *AMI) write(bytes []byte) error { c.Mutex.Lock() defer c.Mutex.Unlock() - _, err := c.Writer.Write(bytes) if err != nil { return err } - err = c.Writer.Flush() if err != nil { return err } - return nil } -func (c *AMI) Login(parentContext context.Context, timeout time.Duration, username, password string) error { - ctx, cancel := context.WithTimeout(parentContext, timeout) +func (c *AMI) release(ctx context.Context) { + c.Subs = NewPubSubQueue() + c.Err = make(chan error) + go func() { + defer c.Subs.Disabled() + for { + select { + case <-ctx.Done(): + c.EmitError(ctx.Err()) + return + default: + message, err := c.read() + if ctx.Err() != nil { + c.EmitError(ctx.Err()) + return + } + if err != nil { + if err == ErrorAsteriskNetwork { + c.EmitError(err) + return + } + } + c.Raw = message + c.publish(message) + } + } + }() +} + +func (c *AMI) authenticate(_ctx context.Context, request AmiClient) error { + ctx, cancel := context.WithTimeout(_ctx, request.timeout) defer cancel() - message := LoginWith(username, password) + message := Authenticate(request.username, request.password) reader := func() (chan *AMIMessage, chan error) { chMessage := make(chan *AMIMessage) fail := make(chan error) go func() { defer close(chMessage) defer close(fail) - - _message, err := c.Read() - + _message, err := c.read() if err != nil { fail <- err return } - chMessage <- _message }() - return chMessage, fail } - - chMessage, fail := reader() - if err := c.Write(message.Bytes()); err != nil { + message_reader, fail := reader() + if err := c.write(message.Bytes()); err != nil { return err } - select { case <-ctx.Done(): - return ErrorAsteriskConnTimeout.AMIError(ErrorLoginFailed) + return ErrorAsteriskConnTimeout.ErrorWrap(ErrorAuthenticatedUnsuccessfully) case err := <-fail: - return ErrorAsteriskNetwork.AMIError(err.Error()) - case msg := <-chMessage: + return ErrorAsteriskNetwork.ErrorWrap(err.Error()) + case msg := <-message_reader: if !msg.IsSuccess() { - return ErrorAsteriskLogin + return ErrorAsteriskAuthenticated } else { - // add callback value c.Raw = msg } } return nil } -func (c *AMI) Publish(message *AMIMessage) { - if message != nil { - c.Subs.Publish(message) +// create initializes a new AMI client with the provided network connection. +// It returns the initialized AMI client along with a cancellable context for managing its lifecycle. +// +// Parameters: +// - conn: The network connection used by the AMI client. +// +// Returns: +// - An initialized AMI client (*AMI). +// - A cancellable context for managing the AMI client's lifecycle (context.Context). +// +// Example: +// +// // Creating an AMI client with a network connection +// conn, err := net.Dial("tcp", "localhost:5038") +// if err != nil { +// log.Fatal(err) +// } +// client, ctx := create(conn) +// // Use the client and context for AMI operations. +// // Make sure to close the connection and cancel the context when done. +// defer client.Close() +// defer client.Cancel() +func create(conn net.Conn) (*AMI, context.Context) { + ctx, cancel := context.WithCancel(context.Background()) + c := &AMI{ + Reader: textproto.NewReader(bufio.NewReader(conn)), + Writer: bufio.NewWriter(conn), + Conn: conn, + Cancel: cancel, } -} - -func (c *AMI) EmitError(err error) { - go func(err error) { - c.Mutex.Lock() - defer c.Mutex.Unlock() - - if err == nil || c.Err == nil { - return - } - // c.mu.Lock() - // defer c.mu.Unlock() - c.Err <- err - }(err) -} - -func (c *AMI) SetReader(ctx context.Context) { - c.Subs = NewPubSubQueue() - c.Err = make(chan error) - - go func() { - defer c.Subs.Disabled() - for { - select { - case <-ctx.Done(): - c.EmitError(ctx.Err()) - return - default: - message, err := c.Read() - - if ctx.Err() != nil { - c.EmitError(ctx.Err()) - return - } - - if err != nil { - if err == ErrorAsteriskNetwork { - c.EmitError(err) - return - } - } - // add callback value - c.Raw = message - c.Publish(message) - } + if conn != nil { + addr := conn.RemoteAddr().String() + _socket, err := WithSocket(ctx, addr) + if err == nil { + c.Socket = _socket + D().Info("Ami cloning (addr: %v) socket connection succeeded", addr) } - }() -} - -// Error returns channel for error and signals that client should be probably restarted -func (c *AMI) Error() <-chan error { - c.Mutex.RLock() - defer c.Mutex.RUnlock() - return c.Err -} - -// Close client and destroy all subscriptions to events and action responses -func (c *AMI) Close() { - c.Mutex.Lock() - defer c.Mutex.Unlock() - c.Cancel() - c.Subs.Destroy() - c.Conn.Close() - close(c.Err) - c.Err = nil + } + return c, ctx } -// Action sends AMI message to Asterisk server and returns send-only -// response channel on nil -func (c *AMI) Action(message *AMIMessage) bool { - if message.GetActionId() == "" { - message.AddActionId() +func serve(conn net.Conn, request AmiClient) (*AMI, error) { + ins, ctx := create(conn) + err := ins.apply(ctx, request.timeout) + if err != nil { + return nil, err } - - err := c.Write(message.Bytes()) + err = ins.authenticate(ctx, request) if err != nil { - c.EmitError(err) - return false + return nil, err } - - return true -} - -// AllEvents subscribes to any AMI message received from Asterisk server -// returns send-only channel or nil -func (c *AMI) AllEvents() <-chan *AMIMessage { - return c.Subs.Subscribe(config.AmiPubSubKeyRef) -} - -// OnEvent subscribes by event name (case insensitive) and -// returns send-only channel or nil -func (c *AMI) OnEvent(name string) <-chan *AMIMessage { - return c.Subs.Subscribe(name) -} - -// OnEvents subscribes by events name (case insensitive) and -// return send-only channel or nil -func (c *AMI) OnEvents(keys ...string) <-chan *AMIMessage { - return c.Subs.Subscribes(keys...) + ins.release(ctx) + return ins, nil } diff --git a/pkg/ami/ami_client.go b/pkg/ami/ami_client.go new file mode 100644 index 0000000..485c661 --- /dev/null +++ b/pkg/ami/ami_client.go @@ -0,0 +1,83 @@ +package ami + +import ( + "fmt" + "strings" + "time" +) + +func NewAmiClient() *AmiClient { + a := &AmiClient{} + return a +} + +func (a *AmiClient) SetEnabled(value bool) *AmiClient { + a.enabled = value + return a +} + +func (a *AmiClient) IsEnabled() bool { + return a.enabled +} + +func (a *AmiClient) SetHost(value string) *AmiClient { + a.host = value + return a +} + +func (a *AmiClient) Host() string { + return a.host +} + +func (a *AmiClient) SetPort(value int) *AmiClient { + a.port = value + return a +} + +func (a *AmiClient) Port() int { + return a.port +} + +func (a *AmiClient) SetUsername(value string) *AmiClient { + a.username = value + return a +} + +func (a *AmiClient) Username() string { + return a.username +} + +func (a *AmiClient) SetPassword(value string) *AmiClient { + a.password = value + return a +} + +func (a *AmiClient) SetTimeout(value time.Duration) *AmiClient { + a.timeout = value + return a +} + +func (a *AmiClient) Timeout() time.Duration { + return a.timeout +} + +func (a *AmiClient) String() string { + var builder strings.Builder + builder.WriteString(fmt.Sprintf("host=%v;", a.host)) + builder.WriteString(fmt.Sprintf("port=%v;", a.port)) + builder.WriteString(fmt.Sprintf("username=%v;", a.username)) + builder.WriteString(fmt.Sprintf("password=%v;", strings.Repeat("*", 8))) + builder.WriteString(fmt.Sprintf("timeout=%v;", a.timeout)) + return builder.String() +} + +func GetAmiClientSample() *AmiClient { + a := NewAmiClient(). + SetEnabled(true). + SetHost("127.0.0.1"). + SetPort(5038). + SetUsername("admin"). + SetPassword("password"). + SetTimeout(10 * time.Second) + return a +} diff --git a/pkg/ami/ami_error.go b/pkg/ami/ami_error.go index da6aac4..2b09ee1 100644 --- a/pkg/ami/ami_error.go +++ b/pkg/ami/ami_error.go @@ -4,40 +4,38 @@ import "fmt" var ( // ErrorAsteriskConnTimeout error on connection timeout - ErrorAsteriskConnTimeout = AMIErrorNew("Asterisk Server connection timeout") + ErrorAsteriskConnTimeout = AmiErrorWrap("Asterisk Server connection timeout") // ErrorAsteriskInvalidPrompt invalid prompt received from AMI server - ErrorAsteriskInvalidPrompt = AMIErrorNew("Asterisk Server invalid prompt command line") + ErrorAsteriskInvalidPrompt = AmiErrorWrap("Asterisk Server invalid prompt command line") // ErrorAsteriskNetwork networking errors - ErrorAsteriskNetwork = AMIErrorNew("Network error") + ErrorAsteriskNetwork = AmiErrorWrap("Network error") - // ErrorAsteriskLogin AMI server login failed - ErrorAsteriskLogin = AMIErrorNew("Asterisk Server login failed") + // ErrorAsteriskAuthenticated AMI server authenticated unsuccessful + ErrorAsteriskAuthenticated = AmiErrorWrap("Asterisk Server authenticated unsuccessful") - // Error EOF - ErrorEOF = "EOF" - - // Error I/O - ErrorIO = "io: read/write on closed pipe" - ErrorLoginFailed = "Failed login" + // Error messages + ErrorEOF = "EOF" + ErrorIO = "io: read/write on closed pipe" + ErrorAuthenticatedUnsuccessfully = "Authenticated unsuccessful" ) -type AMIError struct { +type AmiError struct { S string E string } -func AMIErrorNew(message string) *AMIError { - return &AMIError{S: message} +func AmiErrorWrap(message string) *AmiError { + return &AmiError{S: message} } -func (e *AMIError) AMIError(message string, args ...interface{}) *AMIError { +func (e *AmiError) ErrorWrap(message string, args ...interface{}) *AmiError { t := fmt.Sprintf(message, args...) e.E = fmt.Sprintf(": %s", t) return e } -func (e *AMIError) Error() string { +func (e *AmiError) Error() string { return fmt.Sprintf("ami has error ocurred: %s%s", e.S, e.E) } diff --git a/pkg/ami/ami_factory.go b/pkg/ami/ami_factory.go new file mode 100644 index 0000000..d8945bb --- /dev/null +++ b/pkg/ami/ami_factory.go @@ -0,0 +1,37 @@ +package ami + +import ( + "fmt" + "net" +) + +type AmiFactory interface { + Connect(host string, port int) (net.Conn, error) +} + +func NewTcp() *tcpAmiFactory { + return &tcpAmiFactory{} +} + +func NewUdp() *udpAmiFactory { + return &udpAmiFactory{} +} + +func (t *tcpAmiFactory) Connect(host string, port int) (net.Conn, error) { + return OnTcpConn(host, port) +} + +func (u *udpAmiFactory) Connect(host string, port int) (net.Conn, error) { + return OnUdpConn(host, port) +} + +func NewClient(factory AmiFactory, request AmiClient) (*AMI, error) { + if !request.IsEnabled() { + return nil, fmt.Errorf("Ami unavailable") + } + conn, err := factory.Connect(request.host, request.port) + if err != nil { + return nil, err + } + return serve(conn, request) +} \ No newline at end of file diff --git a/pkg/ami/ami_message.go b/pkg/ami/ami_message.go index 93f9cf5..3022e0b 100644 --- a/pkg/ami/ami_message.go +++ b/pkg/ami/ami_message.go @@ -68,8 +68,8 @@ func ofMessageWithDictionary(d *AMIDictionary, header textproto.MIMEHeader) *AMI return m } -// Login action by message -func LoginWith(username, password string) *AMIMessage { +// Authenticate action by message +func Authenticate(username, password string) *AMIMessage { a := NewActionWith(config.AmiLoginKey) a.AddField(config.AmiFieldUsername, username) a.AddField(config.AmiFieldSecret, password) diff --git a/pkg/ami/ami_model.go b/pkg/ami/ami_model.go index c9472a2..52ca8eb 100644 --- a/pkg/ami/ami_model.go +++ b/pkg/ami/ami_model.go @@ -11,6 +11,17 @@ import ( type PubChannel chan *AMIMessage type MessageChannel map[string]PubChannel +type tcpAmiFactory struct{} +type udpAmiFactory struct{} + +type AmiClient struct { + enabled bool + host string + port int + username string + password string + timeout time.Duration +} type AMI struct { Mutex sync.RWMutex `json:"-"` diff --git a/pkg/ami/ami_dial.go b/pkg/ami/ami_network.go similarity index 55% rename from pkg/ami/ami_dial.go rename to pkg/ami/ami_network.go index 865d01c..345f04f 100644 --- a/pkg/ami/ami_dial.go +++ b/pkg/ami/ami_network.go @@ -1,57 +1,11 @@ package ami import ( - "bufio" - "context" "net" - "net/textproto" "github.com/pnguyen215/voipkit/pkg/ami/config" ) -// OnConnContext initializes a new AMI client with the provided network connection. -// It returns the initialized AMI client along with a cancellable context for managing its lifecycle. -// -// Parameters: -// - conn: The network connection used by the AMI client. -// -// Returns: -// - An initialized AMI client (*AMI). -// - A cancellable context for managing the AMI client's lifecycle (context.Context). -// -// Example: -// -// // Creating an AMI client with a network connection -// conn, err := net.Dial("tcp", "localhost:5038") -// if err != nil { -// log.Fatal(err) -// } -// client, ctx := OnConnContext(conn) -// // Use the client and context for AMI operations. -// // Make sure to close the connection and cancel the context when done. -// defer client.Close() -// defer client.Cancel() -func OnConnContext(conn net.Conn) (*AMI, context.Context) { - ctx, cancel := context.WithCancel(context.Background()) - client := &AMI{ - Reader: textproto.NewReader(bufio.NewReader(conn)), - Writer: bufio.NewWriter(conn), - Conn: conn, - Cancel: cancel, - } - // Check if the connection is available - if conn != nil { - addr := conn.RemoteAddr().String() - _socket, err := NewAmiSocketContext(ctx, addr) - - if err == nil { - client.Socket = _socket - D().Info("OnConnContext, cloning (addr: %v) socket connection succeeded", addr) - } - } - return client, ctx -} - // OnTcpConn opens a network connection to the specified IP address and port using the default TCP network. // // Parameters: @@ -73,7 +27,7 @@ func OnConnContext(conn net.Conn) (*AMI, context.Context) { // // Make sure to close the connection when done. // defer conn.Close() func OnTcpConn(ip string, port int) (net.Conn, error) { - return NewConn(config.AmiNetworkTcpKey, ip, port) + return NewNetwork(config.AmiNetworkTcpKey, ip, port) } // OnUdpConn opens a network connection to the specified IP address and port using the default UDP network. @@ -97,10 +51,10 @@ func OnTcpConn(ip string, port int) (net.Conn, error) { // // Make sure to close the connection when done. // defer conn.Close() func OnUdpConn(ip string, port int) (net.Conn, error) { - return NewConn(config.AmiNetworkUdpKey, ip, port) + return NewNetwork(config.AmiNetworkUdpKey, ip, port) } -// NewConn opens a network connection to the specified IP address and port using the specified network type. +// NewNetwork opens a network connection to the specified IP address and port using the specified network type. // // Parameters: // - network: The network type ("tcp", "udp", etc.). @@ -114,22 +68,22 @@ func OnUdpConn(ip string, port int) (net.Conn, error) { // Example: // // // Dialing an AMI server at localhost on port 5038 using UDP -// conn, err := NewConn("udp", "localhost", 5038) +// conn, err := NewNetwork("udp", "localhost", 5038) // if err != nil { // log.Fatal(err) // } // // Use the connection for AMI operations. // // Make sure to close the connection when done. // defer conn.Close() -func NewConn(network, ip string, port int) (net.Conn, error) { +func NewNetwork(network, ip string, port int) (net.Conn, error) { if !config.AmiNetworkKeys[network] { - return nil, AMIErrorNew("AMI: Invalid network") + return nil, AmiErrorWrap("AMI: Invalid network") } if IsStringEmpty(ip) { - return nil, AMIErrorNew("AMI: IP must be not empty") + return nil, AmiErrorWrap("AMI: IP must be not empty") } if port <= 0 { - return nil, AMIErrorNew("AMI: Port must be positive number") + return nil, AmiErrorWrap("AMI: Port must be positive number") } host, _port, _ := DecodeIp(ip) if len(host) > 0 && len(_port) > 0 { diff --git a/pkg/ami/ami_socket.go b/pkg/ami/ami_socket.go index 918db6a..03f5a7c 100644 --- a/pkg/ami/ami_socket.go +++ b/pkg/ami/ami_socket.go @@ -53,24 +53,23 @@ func (s *AMISocket) Json() string { } // NewSocket provides a new socket client, connecting to a tcp server. -func NewAmiSocketContext(ctx context.Context, address string) (*AMISocket, error) { +func WithSocket(ctx context.Context, address string) (*AMISocket, error) { var dialer net.Dialer conn, err := dialer.DialContext(ctx, config.AmiNetworkTcpKey, address) if err != nil { return nil, err } - return NewAmiSocketConn(ctx, conn, true) + return WithAmiSocketOver(ctx, conn, true) } -// NewSocket provides a new socket client, connecting to a tcp server. +// WithAmiSocketOver provides a new socket client, connecting to a tcp server. // If the reuseConn = true, then using current connection. // Otherwise, clone the connection from current connection -func NewAmiSocketConn(ctx context.Context, conn net.Conn, reuseConn bool) (*AMISocket, error) { +func WithAmiSocketOver(ctx context.Context, conn net.Conn, reuseConn bool) (*AMISocket, error) { s := NewAmiSocket() if reuseConn { s.Conn = conn } else { - // checking conn available if conn != nil { var dialer net.Dialer _conn, err := dialer.DialContext(ctx, config.AmiNetworkTcpKey, conn.RemoteAddr().String())