diff --git a/pkg/ami/ami.go b/pkg/ami/ami.go index 371b333..7fbb8d3 100644 --- a/pkg/ami/ami.go +++ b/pkg/ami/ami.go @@ -11,39 +11,23 @@ import ( "github.com/pnguyen215/voipkit/pkg/ami/config" ) -func (c *AMI) EmitError(err error) { - go func(err error) { - c.Mutex.Lock() - defer c.Mutex.Unlock() - if err == nil || c.Err == nil { - return - } - // c.mu.Lock() - // defer c.mu.Unlock() - c.Err <- err - }(err) -} - -// Error returns channel for error and signals that client should be probably restarted -func (c *AMI) Error() <-chan error { - c.Mutex.RLock() - defer c.Mutex.RUnlock() - return c.Err -} - -// Close client and destroy all subscriptions to events and action responses -func (c *AMI) Close() { - c.Mutex.Lock() - defer c.Mutex.Unlock() - c.Cancel() - c.Subs.Destroy() - c.Conn.Close() - close(c.Err) - c.Err = nil -} - -// Action send AMI message to Asterisk server and returns send-only -// response channel on nil +// Action sends an AMI action message to the Asterisk server. +// If the action message does not have an ActionID, it adds one automatically. +// The method returns true if the action message is successfully sent, otherwise false. +// In case of an error during the write operation, it emits an error event and returns false. +// +// Parameters: +// - message: Pointer to an AMIMessage representing the action message to be sent. +// +// Example: +// +// actionMessage := NewAMIMessage("Command") +// actionMessage.AddParameter("Action", "Status") +// actionMessage.AddParameter("Command", "core show channels") +// amiClient.Action(actionMessage) +// +// Note: This method is typically used to send action requests to the Asterisk Manager Interface (AMI) +// for tasks such as retrieving status information or executing commands. func (c *AMI) Action(message *AMIMessage) bool { if message.GetActionId() == "" { message.AddActionId() @@ -56,8 +40,22 @@ func (c *AMI) Action(message *AMIMessage) bool { return true } -// AllEvents subscribes to any AMI message received from Asterisk server -// returns send-only channel or nil +// AllEvents subscribes to any AMI message received from the Asterisk server. +// It returns a send-only channel of pointers to AMIMessage or nil if not subscribed. +// +// Example: +// +// allEventsChannel := amiClient.AllEvents() +// go func() { +// for message := range allEventsChannel { +// // Handle AMI messages received from the Asterisk server +// fmt.Println("Received AMI Message:", message) +// } +// }() +// +// Note: This method allows you to subscribe to all Asterisk Manager Interface (AMI) messages, +// providing a channel through which you can receive and handle messages as they are received from the server. +// It is useful for capturing and reacting to various events happening within the Asterisk communication system. func (c *AMI) AllEvents() <-chan *AMIMessage { return c.Subs.Subscribe(config.AmiPubSubKeyRef) } @@ -74,12 +72,100 @@ func (c *AMI) OnEvents(keys ...string) <-chan *AMIMessage { return c.Subs.Subscribes(keys...) } +// EmitError sends an error to the error channel (c.Err) in a non-blocking manner. +// If the error or the error channel is nil, it returns immediately. +// +// Example: +// +// amiClient.EmitError(errors.New("An error occurred")) +// +// Note: This method is designed to be used internally to safely send errors to the error channel +// without blocking the main execution flow. +func (c *AMI) EmitError(err error) { + go func(err error) { + c.Mutex.Lock() + defer c.Mutex.Unlock() + if err == nil || c.Err == nil { + return + } + c.Err <- err + }(err) +} + +// Error returns a read-only channel for errors, signaling that the AMI client encountered an error, +// and the client might need to be restarted. +// +// Example: +// +// errorChannel := amiClient.Error() +// select { +// case err := <-errorChannel: +// log.Printf("Error received: %v", err) +// // Handle the error, possibly restarting the AMI client. +// } +// +// Note: This method provides a non-blocking way to receive errors from the AMI client. +// It returns the error channel, allowing clients to listen for errors and take appropriate actions. +func (c *AMI) Error() <-chan error { + c.Mutex.RLock() + defer c.Mutex.RUnlock() + return c.Err +} + +// Close closes the AMI client, terminating its connection and cleaning up resources. +// It also destroys all subscriptions to events and action responses. +// +// Example: +// +// amiClient.Close() +// +// Note: Closing the AMI client is crucial to ensure proper cleanup of resources. +// It terminates the connection, cancels the context, destroys event subscriptions, and closes the error channel. +// Once closed, the AMI client should not be used further, and a new instance may be created if needed. +func (c *AMI) Close() { + c.Mutex.Lock() + defer c.Mutex.Unlock() + c.Cancel() + c.Subs.Destroy() + c.Conn.Close() + close(c.Err) + c.Err = nil +} + +// publish sends the provided AMI message to all subscribers based on event type and general subscriptions. +// It utilizes the AMI Pub-Sub mechanism for broadcasting messages to interested parties. +// This method is typically called after receiving an AMI message to notify subscribers. +// +// Example: +// +// amiClient.publish(amiMessage) +// +// Note: The AMI Pub-Sub mechanism allows different parts of the application to subscribe to specific events or receive all events. +// When a new AMI message is received, this method broadcasts it to relevant subscribers. func (c *AMI) publish(message *AMIMessage) { if message != nil { c.Subs.Publish(message) } } +// apply sends an action to Asterisk and waits for the acknowledgment prompt. +// It sets a timeout for the operation to prevent blocking for an extended period. +// This method is used to execute actions in the Asterisk Manager Interface (AMI). +// +// Parameters: +// - _ctx: The parent context in which the action is executed. +// - timeout: The maximum duration allowed for the operation, including sending the action and waiting for the acknowledgment. +// +// Returns: +// - An error if the operation encounters any issues or if it times out. +// +// Example: +// +// err := amiClient.apply(context.Background(), 5*time.Second) +// +// Note: The method sends the action to Asterisk and waits for a response. It validates the acknowledgment prompt +// to ensure that the operation is successful. If the prompt is not received or does not match the expected format, +// an error is returned. The provided context and timeout parameters help control the duration of the operation. func (c *AMI) apply(_ctx context.Context, timeout time.Duration) error { ctx, cancel := context.WithTimeout(_ctx, timeout) defer cancel() @@ -112,6 +198,20 @@ func (c *AMI) apply(_ctx context.Context, timeout time.Duration) error { return nil } +// read reads a message from the Asterisk Manager Interface (AMI) connection. +// It parses the MIME headers and creates an AMIMessage from the received data. +// +// Returns: +// - An AMIMessage containing the parsed MIME headers. +// - An error if there is an issue reading or parsing the message. +// +// Example: +// +// message, err := amiClient.read() +// +// Note: The read method is used to read messages from the AMI connection. It parses MIME headers +// and creates an AMIMessage struct to represent the received data. If an error occurs during the read +// operation, it is returned as an error. func (c *AMI) read() (*AMIMessage, error) { headers, err := c.Reader.ReadMIMEHeader() if err != nil { @@ -125,6 +225,21 @@ func (c *AMI) read() (*AMIMessage, error) { return message, nil } +// write sends the provided bytes to the Asterisk Manager Interface (AMI) connection. +// +// Parameters: +// - bytes: The byte slice to be sent to the AMI connection. +// +// Returns: +// - An error if there is an issue writing the bytes to the connection. +// +// Example: +// +// err := amiClient.write([]byte("Action: Login\nUsername: admin\nSecret: mySecret\n\n")) +// +// Note: The write method is used to send bytes to the AMI connection. It acquires a lock on the +// connection to ensure thread safety and writes the bytes using the Writer. Any error during the +// write operation is returned as an error. func (c *AMI) write(bytes []byte) error { c.Mutex.Lock() defer c.Mutex.Unlock() @@ -139,6 +254,20 @@ func (c *AMI) write(bytes []byte) error { return nil } +// release initiates the message handling loop for incoming data from the Asterisk Manager Interface (AMI) connection. +// It continuously reads messages from the connection, processes them, and publishes them to the appropriate subscribers. +// +// Parameters: +// - ctx: The context.Context used for cancellation and error handling. +// +// Example: +// +// amiClient.release(ctx) +// +// Note: The release method is responsible for managing the continuous reading of messages from the AMI connection. +// It initializes a new PubSubQueue for event subscriptions and a channel for errors. The function runs in a goroutine, +// continuously reading messages from the connection, publishing them to subscribers, and handling errors. It terminates +// when the provided context is canceled or an error occurs during the reading process. func (c *AMI) release(ctx context.Context) { c.Subs = NewPubSubQueue() c.Err = make(chan error) @@ -168,6 +297,29 @@ func (c *AMI) release(ctx context.Context) { }() } +// authenticate performs the authentication process with the Asterisk Manager Interface (AMI) server using the provided +// AmiClient configuration. It sends an authentication request, reads the response, and verifies the success of the authentication. +// +// Parameters: +// - _ctx: The context.Context used for cancellation and timeout. +// - request: The AmiClient configuration containing authentication details. +// +// Returns: +// - error: An error indicating any issues during the authentication process. +// +// Example: +// +// err := amiClient.authenticate(ctx, AmiClient{ +// username: "admin", +// password: "secret*password", +// timeout: time.Second * 5, +// }) +// +// Note: The authenticate method sends an authentication request to the AMI server, reads the response, and checks if the +// authentication was successful. It uses the provided context for cancellation and timeout handling. If the context is canceled, +// it returns an error indicating a connection timeout. If there is an issue reading the response or the authentication fails, +// it returns an appropriate error. If the authentication is successful, it updates the Raw field of the AMI client with the +// authentication response. func (c *AMI) authenticate(_ctx context.Context, request AmiClient) error { ctx, cancel := context.WithTimeout(_ctx, request.timeout) defer cancel() @@ -247,6 +399,28 @@ func create(conn net.Conn) (*AMI, context.Context) { return c, ctx } +// serve initializes a new AMI client, establishes a connection, performs the authentication process, and releases resources. +// +// Parameters: +// - conn: The net.Conn representing the connection to the Asterisk Manager Interface (AMI) server. +// - request: The AmiClient configuration containing connection and authentication details. +// +// Returns: +// - *AMI: A pointer to the initialized AMI client if the process succeeds. +// - error: An error indicating any issues during the initialization or authentication process. +// +// Example: +// +// amiClient, err := serve(conn, AmiClient{ +// username: "admin", +// password: "secret*password", +// timeout: time.Second * 5, +// }) +// +// Note: The serve function creates a new AMI client instance, applies necessary settings and configurations, establishes +// a connection to the AMI server, performs the authentication process, and releases resources. If any step in the process +// fails, it returns an appropriate error. If the initialization and authentication are successful, it returns a pointer to +// the AMI client ready for further interaction with the Asterisk server. func serve(conn net.Conn, request AmiClient) (*AMI, error) { ins, ctx := create(conn) err := ins.apply(ctx, request.timeout) diff --git a/pkg/ami/ami_factory.go b/pkg/ami/ami_factory.go index d8945bb..dbf952f 100644 --- a/pkg/ami/ami_factory.go +++ b/pkg/ami/ami_factory.go @@ -9,22 +9,51 @@ type AmiFactory interface { Connect(host string, port int) (net.Conn, error) } +// NewTcp creates a new instance of the tcpAmiFactory, implementing the AmiFactory interface for TCP connections. func NewTcp() *tcpAmiFactory { return &tcpAmiFactory{} } +// NewUdp creates a new instance of the udpAmiFactory, implementing the AmiFactory interface for UDP connections. func NewUdp() *udpAmiFactory { return &udpAmiFactory{} } +// Connect establishes a TCP connection to the specified host and port. func (t *tcpAmiFactory) Connect(host string, port int) (net.Conn, error) { return OnTcpConn(host, port) } +// Connect establishes a UDP connection to the specified host and port. func (u *udpAmiFactory) Connect(host string, port int) (net.Conn, error) { return OnUdpConn(host, port) } +// NewClient creates a new AMI client using the provided AmiFactory and AmiClient configuration. +// +// Parameters: +// - factory: An AmiFactory implementation responsible for establishing a network connection. +// - request: The AmiClient configuration containing connection and authentication details. +// +// Returns: +// - *AMI: A pointer to the initialized AMI client if the process succeeds. +// - error: An error indicating any issues during the initialization or authentication process. +// +// Example: +// +// amiClient, err := NewClient(NewTcp(), AmiClient{ +// host: "localhost", +// port: 5038, +// username: "admin", +// password: "secret*password", +// timeout: time.Second * 5, +// }) +// +// Note: The NewClient function uses the provided AmiFactory to establish a network connection based on the specified +// transport protocol (TCP or UDP). It then calls the serve function to initialize the AMI client, perform authentication, +// and release resources. If any step in the process fails, it returns an appropriate error. If the initialization and +// authentication are successful, it returns a pointer to the AMI client ready for further interaction with the Asterisk +// server. func NewClient(factory AmiFactory, request AmiClient) (*AMI, error) { if !request.IsEnabled() { return nil, fmt.Errorf("Ami unavailable") @@ -34,4 +63,4 @@ func NewClient(factory AmiFactory, request AmiClient) (*AMI, error) { return nil, err } return serve(conn, request) -} \ No newline at end of file +} diff --git a/pkg/ami/ami_pub_channel.go b/pkg/ami/ami_pub_channel.go index 6f46bfa..7d67059 100644 --- a/pkg/ami/ami_pub_channel.go +++ b/pkg/ami/ami_pub_channel.go @@ -82,24 +82,29 @@ func (k *AMIPubSubQueue) Subscribes(keys ...string) PubChannel { return ch } +// Publish broadcasts the provided AMI message to all subscribers interested in the corresponding event type. +// It also broadcasts the message to subscribers interested in all events. +// Returns true if the message is successfully published; otherwise, returns false. +// +// Example: +// +// pubSubQueue.Publish(amiMessage) +// +// Note: The AMI Pub-Sub mechanism allows subscribers to receive notifications for specific events or all events. +// This method ensures that the message is sent to relevant subscribers based on event type and general subscriptions. func (k *AMIPubSubQueue) Publish(message *AMIMessage) bool { k.Mutex.RLock() defer k.Mutex.RUnlock() - if k.Off { return false } - ch, ok := k.Message[config.AmiPubSubKeyRef] - if ok { go func(ch PubChannel) { ch <- message }(ch) } - name := strings.ToLower(message.Field(strings.ToLower(config.AmiEventKey))) - if name != "" { if ch, ok := k.Message[name]; ok { go func(ch PubChannel) { @@ -107,6 +112,5 @@ func (k *AMIPubSubQueue) Publish(message *AMIMessage) bool { }(ch) } } - return true }