Skip to content

Commit

Permalink
refactor: support close admin-cli client
Browse files Browse the repository at this point in the history
#2160

Current admin-cli client does not provide an interface to
close replica server TCP connections.
And the QueryAllNodesDiskInfo function is difficult to use.
SendQueryDiskInfoRequest function is private.
  • Loading branch information
lupengfan1 committed Dec 3, 2024
1 parent f8de6da commit b62189a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 3 deletions.
45 changes: 45 additions & 0 deletions admin-cli/executor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"os"
"strings"

"github.com/apache/incubator-pegasus/admin-cli/client"
"github.com/apache/incubator-pegasus/admin-cli/util"
Expand All @@ -45,6 +46,7 @@ type Client struct {
}

// NewClient creates a client for accessing Pegasus cluster for use of admin-cli.
// This function will call os.Exit.
func NewClient(writer io.Writer, metaAddrs []string) *Client {
meta := client.NewRPCBasedMeta(metaAddrs)

Expand All @@ -67,3 +69,46 @@ func NewClient(writer io.Writer, metaAddrs []string) *Client {
Perf: aggregate.NewPerfClient(metaAddrs),
}
}

// NewClientWithoutExit creates a client for accessing Pegasus cluster for use of admin-cli.
// This function will not call os.Exit.
func NewClientWithoutExit(writer io.Writer, metaAddrs []string) (*Client, error) {
meta := client.NewRPCBasedMeta(metaAddrs)

nodes, err := meta.ListNodes()
if err != nil {
fmt.Fprintf(writer, "fatal: failed to list nodes [%s]\n", err)
return nil, fmt.Errorf("fatal: failed to list nodes [%s]", err)
}

var replicaAddrs []string
for _, node := range nodes {
replicaAddrs = append(replicaAddrs, node.Address.GetAddress())
}

return &Client{
Writer: writer,
Meta: meta,
Nodes: util.NewPegasusNodeManager(metaAddrs, replicaAddrs),
Perf: aggregate.NewPerfClient(metaAddrs),
}, nil
}

func CloseClient(writer io.Writer, client *Client) error {
var errorStrings []string
err := client.Meta.Close()
if err != nil {
fmt.Fprintf(writer, "fatal: failed to close meta session [%s]\n", err)
errorStrings = append(errorStrings, err.Error())
}

client.Perf.Close()

err = client.Nodes.CloseAllNodes()
if err != nil {
fmt.Fprintf(writer, "fatal: failed to close nodes session [%s]\n", err)
errorStrings = append(errorStrings, err.Error())
}

return fmt.Errorf("%s", strings.Join(errorStrings, "\n"))
}
33 changes: 30 additions & 3 deletions admin-cli/executor/disk_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ package executor
import (
"context"
"fmt"
"strings"
"time"

"github.com/apache/incubator-pegasus/admin-cli/tabular"
"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/apache/incubator-pegasus/go-client/idl/admin"
"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/idl/radmin"
"github.com/apache/incubator-pegasus/go-client/session"
Expand All @@ -45,7 +47,7 @@ func QueryDiskInfo(client *Client, infoType DiskInfoType, replicaServer string,
}

func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, tableName string, diskTag string, print bool) ([]interface{}, error) {
resp, err := sendQueryDiskInfoRequest(client, replicaServer, tableName)
resp, err := SendQueryDiskInfoRequest(client, replicaServer, tableName)
if err != nil {
return nil, err
}
Expand All @@ -60,7 +62,7 @@ func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, ta
}
}

func sendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) {
func SendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

Expand Down Expand Up @@ -88,7 +90,7 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) (map[string]*radmin
}
for _, nodeInfo := range nodeInfos {
address := nodeInfo.GetAddress().GetAddress()
resp, err := sendQueryDiskInfoRequest(client, address, tableName)
resp, err := SendQueryDiskInfoRequest(client, address, tableName)
if err != nil {
return respMap, err
}
Expand All @@ -97,6 +99,31 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) (map[string]*radmin
return respMap, nil
}

func QueryAliveNodesDiskInfo(client *Client, tableName string) (map[string]*radmin.QueryDiskInfoResponse, error) {
respMap := make(map[string]*radmin.QueryDiskInfoResponse)
nodeInfos, err := client.Meta.ListNodes()
if err != nil {
return respMap, err
}
for _, nodeInfo := range nodeInfos {
if nodeInfo.Status != admin.NodeStatus_NS_ALIVE {
continue
}
address := nodeInfo.GetAddress().GetAddress()
resp, err := SendQueryDiskInfoRequest(client, address, tableName)
if err != nil {
// this replica server haven't the table partition.
if strings.Contains(err.Error(), "ERR_OBJECT_NOT_FOUND") {
continue
} else {
return respMap, err
}
}
respMap[address] = resp
}
return respMap, nil
}

type DiskCapacityStruct struct {
Disk string `json:"disk"`
Capacity int64 `json:"capacity"`
Expand Down
22 changes: 22 additions & 0 deletions admin-cli/util/pegasus_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ func (n *PegasusNode) RPCAddress() *base.RPCAddress {
return base.NewRPCAddress(n.IP, n.Port)
}

func (n *PegasusNode) Close() error {
if n.session != nil {
return n.session.Close()
} else {

Check failure on line 90 in admin-cli/util/pegasus_node.go

View workflow job for this annotation

GitHub Actions / Lint

`if` block ends with a `return` statement, so drop this `else` and outdent its block (golint)
return nil
}
}

// NewNodeFromTCPAddr creates a node from tcp address.
// NOTE:
// - Will not initialize TCP connection unless needed.
Expand Down Expand Up @@ -211,3 +219,17 @@ func (m *PegasusNodeManager) GetPerfSession(addr string, ntype session.NodeType)

return aggregate.WrapPerf(addr, node.session)
}

func (m *PegasusNodeManager) CloseAllNodes() error {
var errorStrings []string
for _, n := range m.nodes {
err := n.Close()
if err != nil {
errorStrings = append(errorStrings, err.Error())
}
}
if len(errorStrings) != 0 {
return fmt.Errorf("%s", strings.Join(errorStrings, "\n"))
}
return nil
}

0 comments on commit b62189a

Please sign in to comment.