Skip to content

Commit

Permalink
🔇 silent changes: updated comment and document #14
Browse files Browse the repository at this point in the history
  • Loading branch information
pnguyen215 committed Jan 13, 2024
1 parent 5c81a75 commit 83ca460
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 42 deletions.
244 changes: 209 additions & 35 deletions pkg/ami/ami.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,23 @@ import (
"github.com/pnguyen215/voipkit/pkg/ami/config"
)

func (c *AMI) EmitError(err error) {
go func(err error) {
c.Mutex.Lock()
defer c.Mutex.Unlock()
if err == nil || c.Err == nil {
return
}
// c.mu.Lock()
// defer c.mu.Unlock()
c.Err <- err
}(err)
}

// Error returns channel for error and signals that client should be probably restarted
func (c *AMI) Error() <-chan error {
c.Mutex.RLock()
defer c.Mutex.RUnlock()
return c.Err
}

// Close client and destroy all subscriptions to events and action responses
func (c *AMI) Close() {
c.Mutex.Lock()
defer c.Mutex.Unlock()
c.Cancel()
c.Subs.Destroy()
c.Conn.Close()
close(c.Err)
c.Err = nil
}

// Action send AMI message to Asterisk server and returns send-only
// response channel on nil
// Action sends an AMI action message to the Asterisk server.
// If the action message does not have an ActionID, it adds one automatically.
// The method returns true if the action message is successfully sent, otherwise false.
// In case of an error during the write operation, it emits an error event and returns false.
//
// Parameters:
// - message: Pointer to an AMIMessage representing the action message to be sent.
//
// Example:
//
// actionMessage := NewAMIMessage("Command")
// actionMessage.AddParameter("Action", "Status")
// actionMessage.AddParameter("Command", "core show channels")
// amiClient.Action(actionMessage)
//
// Note: This method is typically used to send action requests to the Asterisk Manager Interface (AMI)
// for tasks such as retrieving status information or executing commands.
func (c *AMI) Action(message *AMIMessage) bool {
if message.GetActionId() == "" {
message.AddActionId()
Expand All @@ -56,8 +40,22 @@ func (c *AMI) Action(message *AMIMessage) bool {
return true
}

// AllEvents subscribes to any AMI message received from Asterisk server
// returns send-only channel or nil
// AllEvents subscribes to any AMI message received from the Asterisk server.
// It returns a send-only channel of pointers to AMIMessage or nil if not subscribed.
//
// Example:
//
// allEventsChannel := amiClient.AllEvents()
// go func() {
// for message := range allEventsChannel {
// // Handle AMI messages received from the Asterisk server
// fmt.Println("Received AMI Message:", message)
// }
// }()
//
// Note: This method allows you to subscribe to all Asterisk Manager Interface (AMI) messages,
// providing a channel through which you can receive and handle messages as they are received from the server.
// It is useful for capturing and reacting to various events happening within the Asterisk communication system.
func (c *AMI) AllEvents() <-chan *AMIMessage {
return c.Subs.Subscribe(config.AmiPubSubKeyRef)
}
Expand All @@ -74,12 +72,100 @@ func (c *AMI) OnEvents(keys ...string) <-chan *AMIMessage {
return c.Subs.Subscribes(keys...)
}

// EmitError sends an error to the error channel (c.Err) in a non-blocking manner.
// If the error or the error channel is nil, it returns immediately.
//
// Example:
//
// amiClient.EmitError(errors.New("An error occurred"))
//
// Note: This method is designed to be used internally to safely send errors to the error channel
// without blocking the main execution flow.
func (c *AMI) EmitError(err error) {
go func(err error) {
c.Mutex.Lock()
defer c.Mutex.Unlock()
if err == nil || c.Err == nil {
return
}
c.Err <- err
}(err)
}

// Error returns a read-only channel for errors, signaling that the AMI client encountered an error,
// and the client might need to be restarted.
//
// Example:
//
// errorChannel := amiClient.Error()
// select {
// case err := <-errorChannel:
// log.Printf("Error received: %v", err)
// // Handle the error, possibly restarting the AMI client.
// }
//
// Note: This method provides a non-blocking way to receive errors from the AMI client.
// It returns the error channel, allowing clients to listen for errors and take appropriate actions.
func (c *AMI) Error() <-chan error {
c.Mutex.RLock()
defer c.Mutex.RUnlock()
return c.Err
}

// Close closes the AMI client, terminating its connection and cleaning up resources.
// It also destroys all subscriptions to events and action responses.
//
// Example:
//
// amiClient.Close()
//
// Note: Closing the AMI client is crucial to ensure proper cleanup of resources.
// It terminates the connection, cancels the context, destroys event subscriptions, and closes the error channel.
// Once closed, the AMI client should not be used further, and a new instance may be created if needed.
func (c *AMI) Close() {
c.Mutex.Lock()
defer c.Mutex.Unlock()
c.Cancel()
c.Subs.Destroy()
c.Conn.Close()
close(c.Err)
c.Err = nil
}

// publish sends the provided AMI message to all subscribers based on event type and general subscriptions.
// It utilizes the AMI Pub-Sub mechanism for broadcasting messages to interested parties.
// This method is typically called after receiving an AMI message to notify subscribers.
//
// Example:
//
// amiClient.publish(amiMessage)
//
// Note: The AMI Pub-Sub mechanism allows different parts of the application to subscribe to specific events or receive all events.
// When a new AMI message is received, this method broadcasts it to relevant subscribers.
func (c *AMI) publish(message *AMIMessage) {
if message != nil {
c.Subs.Publish(message)
}
}

// apply sends an action to Asterisk and waits for the acknowledgment prompt.
// It sets a timeout for the operation to prevent blocking for an extended period.
// This method is used to execute actions in the Asterisk Manager Interface (AMI).
//
// Parameters:
// - _ctx: The parent context in which the action is executed.
// - timeout: The maximum duration allowed for the operation, including sending the action and waiting for the acknowledgment.
//
// Returns:
// - An error if the operation encounters any issues or if it times out.
//
// Example:
//
// err := amiClient.apply(context.Background(), 5*time.Second)
//
// Note: The method sends the action to Asterisk and waits for a response. It validates the acknowledgment prompt
// to ensure that the operation is successful. If the prompt is not received or does not match the expected format,
// an error is returned. The provided context and timeout parameters help control the duration of the operation.
func (c *AMI) apply(_ctx context.Context, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(_ctx, timeout)
defer cancel()
Expand Down Expand Up @@ -112,6 +198,20 @@ func (c *AMI) apply(_ctx context.Context, timeout time.Duration) error {
return nil
}

// read reads a message from the Asterisk Manager Interface (AMI) connection.
// It parses the MIME headers and creates an AMIMessage from the received data.
//
// Returns:
// - An AMIMessage containing the parsed MIME headers.
// - An error if there is an issue reading or parsing the message.
//
// Example:
//
// message, err := amiClient.read()
//
// Note: The read method is used to read messages from the AMI connection. It parses MIME headers
// and creates an AMIMessage struct to represent the received data. If an error occurs during the read
// operation, it is returned as an error.
func (c *AMI) read() (*AMIMessage, error) {
headers, err := c.Reader.ReadMIMEHeader()
if err != nil {
Expand All @@ -125,6 +225,21 @@ func (c *AMI) read() (*AMIMessage, error) {
return message, nil
}

// write sends the provided bytes to the Asterisk Manager Interface (AMI) connection.
//
// Parameters:
// - bytes: The byte slice to be sent to the AMI connection.
//
// Returns:
// - An error if there is an issue writing the bytes to the connection.
//
// Example:
//
// err := amiClient.write([]byte("Action: Login\nUsername: admin\nSecret: mySecret\n\n"))
//
// Note: The write method is used to send bytes to the AMI connection. It acquires a lock on the
// connection to ensure thread safety and writes the bytes using the Writer. Any error during the
// write operation is returned as an error.
func (c *AMI) write(bytes []byte) error {
c.Mutex.Lock()
defer c.Mutex.Unlock()
Expand All @@ -139,6 +254,20 @@ func (c *AMI) write(bytes []byte) error {
return nil
}

// release initiates the message handling loop for incoming data from the Asterisk Manager Interface (AMI) connection.
// It continuously reads messages from the connection, processes them, and publishes them to the appropriate subscribers.
//
// Parameters:
// - ctx: The context.Context used for cancellation and error handling.
//
// Example:
//
// amiClient.release(ctx)
//
// Note: The release method is responsible for managing the continuous reading of messages from the AMI connection.
// It initializes a new PubSubQueue for event subscriptions and a channel for errors. The function runs in a goroutine,
// continuously reading messages from the connection, publishing them to subscribers, and handling errors. It terminates
// when the provided context is canceled or an error occurs during the reading process.
func (c *AMI) release(ctx context.Context) {
c.Subs = NewPubSubQueue()
c.Err = make(chan error)
Expand Down Expand Up @@ -168,6 +297,29 @@ func (c *AMI) release(ctx context.Context) {
}()
}

// authenticate performs the authentication process with the Asterisk Manager Interface (AMI) server using the provided
// AmiClient configuration. It sends an authentication request, reads the response, and verifies the success of the authentication.
//
// Parameters:
// - _ctx: The context.Context used for cancellation and timeout.
// - request: The AmiClient configuration containing authentication details.
//
// Returns:
// - error: An error indicating any issues during the authentication process.
//
// Example:
//
// err := amiClient.authenticate(ctx, AmiClient{
// username: "admin",
// password: "secret*password",
// timeout: time.Second * 5,
// })
//
// Note: The authenticate method sends an authentication request to the AMI server, reads the response, and checks if the
// authentication was successful. It uses the provided context for cancellation and timeout handling. If the context is canceled,
// it returns an error indicating a connection timeout. If there is an issue reading the response or the authentication fails,
// it returns an appropriate error. If the authentication is successful, it updates the Raw field of the AMI client with the
// authentication response.
func (c *AMI) authenticate(_ctx context.Context, request AmiClient) error {
ctx, cancel := context.WithTimeout(_ctx, request.timeout)
defer cancel()
Expand Down Expand Up @@ -247,6 +399,28 @@ func create(conn net.Conn) (*AMI, context.Context) {
return c, ctx
}

// serve initializes a new AMI client, establishes a connection, performs the authentication process, and releases resources.
//
// Parameters:
// - conn: The net.Conn representing the connection to the Asterisk Manager Interface (AMI) server.
// - request: The AmiClient configuration containing connection and authentication details.
//
// Returns:
// - *AMI: A pointer to the initialized AMI client if the process succeeds.
// - error: An error indicating any issues during the initialization or authentication process.
//
// Example:
//
// amiClient, err := serve(conn, AmiClient{
// username: "admin",
// password: "secret*password",
// timeout: time.Second * 5,
// })
//
// Note: The serve function creates a new AMI client instance, applies necessary settings and configurations, establishes
// a connection to the AMI server, performs the authentication process, and releases resources. If any step in the process
// fails, it returns an appropriate error. If the initialization and authentication are successful, it returns a pointer to
// the AMI client ready for further interaction with the Asterisk server.
func serve(conn net.Conn, request AmiClient) (*AMI, error) {
ins, ctx := create(conn)
err := ins.apply(ctx, request.timeout)
Expand Down
31 changes: 30 additions & 1 deletion pkg/ami/ami_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,51 @@ type AmiFactory interface {
Connect(host string, port int) (net.Conn, error)
}

// NewTcp creates a new instance of the tcpAmiFactory, implementing the AmiFactory interface for TCP connections.
func NewTcp() *tcpAmiFactory {
return &tcpAmiFactory{}
}

// NewUdp creates a new instance of the udpAmiFactory, implementing the AmiFactory interface for UDP connections.
func NewUdp() *udpAmiFactory {
return &udpAmiFactory{}
}

// Connect establishes a TCP connection to the specified host and port.
func (t *tcpAmiFactory) Connect(host string, port int) (net.Conn, error) {
return OnTcpConn(host, port)
}

// Connect establishes a UDP connection to the specified host and port.
func (u *udpAmiFactory) Connect(host string, port int) (net.Conn, error) {
return OnUdpConn(host, port)
}

// NewClient creates a new AMI client using the provided AmiFactory and AmiClient configuration.
//
// Parameters:
// - factory: An AmiFactory implementation responsible for establishing a network connection.
// - request: The AmiClient configuration containing connection and authentication details.
//
// Returns:
// - *AMI: A pointer to the initialized AMI client if the process succeeds.
// - error: An error indicating any issues during the initialization or authentication process.
//
// Example:
//
// amiClient, err := NewClient(NewTcp(), AmiClient{
// host: "localhost",
// port: 5038,
// username: "admin",
// password: "secret*password",
// timeout: time.Second * 5,
// })
//
// Note: The NewClient function uses the provided AmiFactory to establish a network connection based on the specified
// transport protocol (TCP or UDP). It then calls the serve function to initialize the AMI client, perform authentication,
// and release resources. If any step in the process fails, it returns an appropriate error. If the initialization and
// authentication are successful, it returns a pointer to the AMI client ready for further interaction with the Asterisk
// server.
func NewClient(factory AmiFactory, request AmiClient) (*AMI, error) {
if !request.IsEnabled() {
return nil, fmt.Errorf("Ami unavailable")
Expand All @@ -34,4 +63,4 @@ func NewClient(factory AmiFactory, request AmiClient) (*AMI, error) {
return nil, err
}
return serve(conn, request)
}
}
Loading

0 comments on commit 83ca460

Please sign in to comment.