Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Refine show segment-loaded-grpc command #319

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion states/balance_explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func ExplainBalanceCommand(cli clientv3.KV, basePath string) *cobra.Command {
Use: "explain-balance",
Short: "explain segments and channels current balance status",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 0. set up collection, policy, servers and params
collectionID, err := cmd.Flags().GetInt64(collectionLabel)
if err != nil {
Expand All @@ -44,7 +46,7 @@ func ExplainBalanceCommand(cli clientv3.KV, basePath string) *cobra.Command {
}

// 1. set up segment distribution view, replicas and segmentInfos
sessions, err := ListServers(cli, basePath, queryNode)
sessions, err := ListServers(ctx, cli, basePath, queryNode)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion states/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type GetConfigurationParam struct {

func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfigurationParam) error {
p.Filter = strings.ToLower(p.Filter)
sessions, err := common.ListSessions(s.client, s.basePath)
sessions, err := common.ListSessions(ctx, s.client, s.basePath)
if err != nil {
return err
}
Expand Down
223 changes: 149 additions & 74 deletions states/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,98 +3,173 @@ package states
import (
"context"
"fmt"
"sync"
"time"

"github.com/samber/lo"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb"
querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
)

// GetDistributionCommand returns command to iterate all querynodes to list distribution.
func GetDistributionCommand(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "segment-loaded-grpc",
Short: "list segments loaded information",
RunE: func(cmd *cobra.Command, args []string) error {
collectionID, err := cmd.Flags().GetInt64("collection")
var sealedCnt int64
if err != nil {
return err
type GetDistributionParam struct {
framework.ParamBase `use:"show segment-loaded-grpc" desc:"list segments loaded information"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"`
}

// GetDistributionCommand iterates all querynodes to list distribution.
func (s *InstanceState) GetDistributionCommand(ctx context.Context, p *GetDistributionParam) error {
// list segment info to get row count information
segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool {
return p.CollectionID == 0 || p.CollectionID == s.CollectionID
})
if err != nil {
return err
}

id2Segment := lo.SliceToMap(segments, func(s *models.Segment) (int64, *models.Segment) {
return s.ID, s
})

sessions, err := common.ListSessions(ctx, s.client, s.basePath)
if err != nil {
return err
}

qnSessions := lo.Filter(sessions, func(sess *models.Session, _ int) bool {
return sess.ServerName == "querynode"
})

type clientWithID struct {
client querypbv2.QueryNodeClient
id int64
}
var wg sync.WaitGroup
clientCh := make(chan clientWithID, len(qnSessions))
for _, session := range qnSessions {
wg.Add(1)
go func(session *models.Session) {
defer wg.Done()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
sessions, err := common.ListSessions(cli, basePath)

dialCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
conn, err := grpc.DialContext(dialCtx, session.Address, opts...)
cancel()
// ignore bad session
if err != nil {
return err
fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error())
return
}

for _, session := range sessions {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
clientv2 := querypbv2.NewQueryNodeClient(conn)
clientCh <- clientWithID{
client: clientv2,
id: session.ServerID,
}
}(session)
}
wg.Wait()
close(clientCh)

dialCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
conn, err := grpc.DialContext(dialCtx, session.Address, opts...)
cancel()
if err != nil {
fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error())
continue
}
type distResponse struct {
resp *querypbv2.GetDataDistributionResponse
err error
id int64
}

if session.ServerName == "querynode" {
fmt.Println("===========")
fmt.Printf("ServerID %d\n", session.ServerID)
clientv2 := querypbv2.NewQueryNodeClient(conn)
resp, err := clientv2.GetDataDistribution(context.Background(), &querypbv2.GetDataDistributionRequest{
Base: &commonpbv2.MsgBase{
SourceID: -1,
TargetID: session.ServerID,
},
})
if err != nil {
fmt.Println(err.Error())
continue
}

// print channel
for _, channel := range resp.GetChannels() {
if collectionID != 0 && channel.GetCollection() != collectionID {
continue
}
fmt.Printf("Channel %s, collection: %d, version %d\n", channel.Channel, channel.Collection, channel.Version)
}

for _, lv := range resp.GetLeaderViews() {
if collectionID != 0 && lv.GetCollection() != collectionID {
continue
}
fmt.Printf("Leader view for channel: %s\n", lv.GetChannel())
growings := lo.Uniq(lo.Union(lv.GetGrowingSegmentIDs(), lo.Keys(lv.GetGrowingSegments())))
fmt.Printf("Growing segments number: %d , ids: %v\n", len(growings), growings)
}

sealedNum := 0
for _, segment := range resp.GetSegments() {
if collectionID != 0 && segment.GetCollection() != collectionID {
continue
}
fmt.Printf("SegmentID: %d CollectionID: %d Channel: %s\n", segment.GetID(), segment.GetCollection(), segment.GetChannel())
sealedNum++
}
fmt.Println("Sealed segments number:", sealedNum)
sealedCnt += int64(sealedNum)
}
respCh := make(chan distResponse, len(qnSessions))

for idClient := range clientCh {
wg.Add(1)
go func(idClient clientWithID) {
defer wg.Done()
resp, err := idClient.client.GetDataDistribution(context.Background(), &querypbv2.GetDataDistributionRequest{
Base: &commonpbv2.MsgBase{
SourceID: -1,
TargetID: idClient.id,
},
})
respCh <- distResponse{
resp: resp,
err: err,
id: idClient.id,
}
}(idClient)
}

wg.Wait()
close(respCh)

var totalSealedCnt int
var totalSealedRowcount int64

for result := range respCh {
fmt.Println("===========")
fmt.Printf("ServerID %d\n", result.id)
if result.err != nil {
fmt.Println("Error fetching distribution:", result.err.Error())
continue
}
resp := result.resp

// print channel
for _, channel := range resp.GetChannels() {
if p.CollectionID != 0 && channel.GetCollection() != p.CollectionID {
continue
}
fmt.Printf("==== total loaded sealed segment number: %d\n", sealedCnt)
fmt.Printf("Channel %s, collection: %d, version %d\n", channel.Channel, channel.Collection, channel.Version)
}

return nil
},
for _, lv := range resp.GetLeaderViews() {
if p.CollectionID != 0 && lv.GetCollection() != p.CollectionID {
continue
}
fmt.Printf("Leader view for channel: %s\n", lv.GetChannel())
growings := lo.Uniq(lo.Union(lv.GetGrowingSegmentIDs(), lo.Keys(lv.GetGrowingSegments())))
fmt.Printf("Growing segments number: %d , ids: %v\n", len(growings), growings)
}

sealedNum := 0
sealedRowCount := int64(0)

collSegments := lo.GroupBy(resp.GetSegments(), func(segment *querypbv2.SegmentVersionInfo) int64 {
return segment.GetCollection()
})

for collection, segments := range collSegments {
if p.CollectionID != 0 && collection != p.CollectionID {
continue
}
fmt.Printf("------ Collection %d ------\n", collection)
var collRowCount int64
for _, segment := range segments {
segmentInfo := id2Segment[segment.GetID()]
var rc int64
if segmentInfo != nil {
rc = segmentInfo.NumOfRows
}
fmt.Printf("SegmentID: %d CollectionID: %d Channel: %s, NumOfRows %d\n", segment.GetID(), segment.GetCollection(), segment.GetChannel(), rc)
sealedNum++
collRowCount += rc
}
fmt.Printf("Collection RowCount total %d\n\n", collRowCount)
sealedRowCount += collRowCount
}
fmt.Println("------------------")
fmt.Printf("Sealed segments number: %d Sealed Row Num: %d\n", sealedNum, sealedRowCount)
totalSealedCnt += sealedNum
totalSealedRowcount += sealedRowCount
}
cmd.Flags().Int64("collection", 0, "collection id to filter with")
return cmd
fmt.Println("==========================================")
fmt.Printf("\n#### total loaded sealed segment number: %d, total loaded row count: %d\n", totalSealedCnt, totalSealedRowcount)
return nil
}
9 changes: 3 additions & 6 deletions states/etcd/common/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"path"
"time"

clientv3 "go.etcd.io/etcd/client/v3"

Expand All @@ -16,15 +15,13 @@ const (
)

// ListSessions returns all session.
func ListSessions(cli clientv3.KV, basePath string) ([]*models.Session, error) {
func ListSessions(ctx context.Context, cli clientv3.KV, basePath string) ([]*models.Session, error) {
prefix := path.Join(basePath, sessionPrefix)
return ListSessionsByPrefix(cli, prefix)
return ListSessionsByPrefix(ctx, cli, prefix)
}

// ListSessionsByPrefix returns all session with provided prefix.
func ListSessionsByPrefix(cli clientv3.KV, prefix string) ([]*models.Session, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
func ListSessionsByPrefix(ctx context.Context, cli clientv3.KV, prefix string) ([]*models.Session, error) {
resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion states/etcd/download/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func PullGlobalDistributionDetails(cli clientv3.KV, basePath string) *cobra.Comm
Use: "global-distribution",
Short: "pull global distribution details",
RunE: func(cmd *cobra.Command, args []string) error {
sessions, err := common.ListSessions(cli, basePath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sessions, err := common.ListSessions(ctx, cli, basePath)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/remove/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type RemoveSessionParam struct {
}

func (c *ComponentRemove) RemoveSessionCommand(ctx context.Context, p *RemoveSessionParam) error {
sessions, err := common.ListSessions(c.client, c.basePath)
sessions, err := common.ListSessions(ctx, c.client, c.basePath)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion states/etcd/repair/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command {
}

func doDatacoordWatch(cli clientv3.KV, basePath string, collectionID int64, vchannels []string) {
sessions, err := common.ListSessions(cli, basePath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sessions, err := common.ListSessions(ctx, cli, basePath)
if err != nil {
fmt.Println("failed to list session")
return
Expand Down
4 changes: 3 additions & 1 deletion states/etcd/repair/manual_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func ManualCompactionCommand(cli clientv3.KV, basePath string) *cobra.Command {
}

func doManualCompaction(cli clientv3.KV, basePath string, collID int64) {
sessions, err := common.ListSessions(cli, basePath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sessions, err := common.ListSessions(ctx, cli, basePath)
if err != nil {
fmt.Println("failed to list session")
return
Expand Down
11 changes: 7 additions & 4 deletions states/etcd/show/legacy_qc_cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package show

import (
"context"
"fmt"
"path"

Expand All @@ -15,9 +16,9 @@ const (
queryNodeInfoPrefix = "queryCoord-queryNodeInfo"
)

func listQueryCoordClusterNodeInfo(cli clientv3.KV, basePath string) ([]*models.Session, error) {
func listQueryCoordClusterNodeInfo(ctx context.Context, cli clientv3.KV, basePath string) ([]*models.Session, error) {
prefix := path.Join(basePath, queryNodeInfoPrefix)
return common.ListSessionsByPrefix(cli, prefix)
return common.ListSessionsByPrefix(ctx, cli, prefix)
}

// QueryCoordClusterCommand returns show querycoord-cluster command.
Expand All @@ -27,13 +28,15 @@ func QueryCoordClusterCommand(cli clientv3.KV, basePath string) *cobra.Command {
Short: "display querynode information from querycoord cluster",
Aliases: []string{"querycoord-clusters"},
RunE: func(cmd *cobra.Command, args []string) error {
sessions, err := listQueryCoordClusterNodeInfo(cli, basePath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sessions, err := listQueryCoordClusterNodeInfo(ctx, cli, basePath)
if err != nil {
fmt.Println("failed to list tasks in querycoord", err.Error())
return nil
}

onlineSessons, _ := common.ListSessions(cli, basePath)
onlineSessons, _ := common.ListSessions(ctx, cli, basePath)
onlineSessionMap := make(map[UniqueID]struct{})
for _, s := range onlineSessons {
onlineSessionMap[s.ServerID] = struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type SessionParam struct {
// SessionCommand returns show session command.
// usage: show session
func (c *ComponentShow) SessionCommand(ctx context.Context, p *SessionParam) (*Sessions, error) {
sessions, err := common.ListSessions(c.client, c.basePath)
sessions, err := common.ListSessions(ctx, c.client, c.basePath)
if err != nil {
return nil, errors.Wrap(err, "failed to list sessions")
}
Expand Down
Loading
Loading