Skip to content

Commit

Permalink
Merge pull request #23806 from gene-redpanda/format-for-list
Browse files Browse the repository at this point in the history
rpk: Add format to `rpk topic list`
  • Loading branch information
gene-redpanda authored Oct 22, 2024
2 parents d6aabeb + 8ebb95d commit 919837d
Show file tree
Hide file tree
Showing 4 changed files with 368 additions and 4 deletions.
4 changes: 3 additions & 1 deletion src/go/rpk/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ GOFUMPTCMD=gofumpt
ifeq ($(shell uname),Darwin)
BAZELCMD := bazelisk
BAZEL_GAZELLE_CMD := run --config system-clang //:gazelle
GOFUMPT_OS_CMD := $(shell find . -type f -name '*.go' -print0 | xargs -0 -n 1 $(GOFUMPTCMD) -w)
else
BAZELCMD := bazel
BAZEL_GAZELLE_CMD := run //:gazelle
GOFUMPT_OS_CMD := $(shell find -type f -name '*.go' | xargs -n1 $(GOFUMPTCMD) -w)
endif

GOOS ?= $(shell go env GOOS)
Expand Down Expand Up @@ -59,7 +61,7 @@ install_gofumpt:

run_gofumpt:
@echo "running gofumpt"
$(shell find -type f -name '*.go' | xargs -n1 $(GOFUMPTCMD) -w)
$(GOFUMPT_OS_CMD)

install_golangci_lint:
@echo "installing golangci-lint"
Expand Down
4 changes: 3 additions & 1 deletion src/go/rpk/pkg/cli/topic/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//src/go/rpk/pkg/adminapi",
"//src/go/rpk/pkg/cli/cluster",
"//src/go/rpk/pkg/config",
"//src/go/rpk/pkg/kafka",
"//src/go/rpk/pkg/out",
Expand Down Expand Up @@ -48,15 +47,18 @@ go_test(
srcs = [
"consume_test.go",
"describe_test.go",
"list_test.go",
"trim_test.go",
"utils_test.go",
],
embed = [":topic"],
deps = [
"//src/go/rpk/pkg/config",
"@com_github_spf13_afero//:afero",
"@com_github_stretchr_testify//require",
"@com_github_twmb_franz_go//pkg/kerr",
"@com_github_twmb_franz_go_pkg_kadm//:kadm",
"@com_github_twmb_franz_go_pkg_kmsg//:kmsg",
"@in_gopkg_yaml_v3//:yaml_v3",
],
)
171 changes: 169 additions & 2 deletions src/go/rpk/pkg/cli/topic/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ package topic

import (
"context"
"fmt"
"io"
"os"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/kadm"
)

func newListCommand(fs afero.Fs, p *config.Params) *cobra.Command {
Expand Down Expand Up @@ -55,6 +58,17 @@ information.
// We forbid deleting internal topics (redpanda
// actually does not expose these currently), so we
// make -r and -i incompatible.

f := p.Formatter
if detailed {
if h, ok := f.Help([]detailedListTopic{}); ok {
out.Exit(h)
}
} else {
if h, ok := f.Help([]summarizedList{}); ok {
out.Exit(h)
}
}
if internal && re {
out.Exit("cannot list with internal topics and list by regular expression")
}
Expand All @@ -73,12 +87,165 @@ information.

listed, err := adm.ListTopicsWithInternal(context.Background(), topics...)
out.MaybeDie(err, "unable to request metadata: %v", err)
cluster.PrintTopics(listed, internal, detailed)

if detailed {
printDetailedListView(f, detailedListView(internal, listed), os.Stdout)
} else {
printSummarizedListView(f, summarizedListView(internal, listed), os.Stdout)
}
out.MaybeDie(err, "unable to summarize metadata: %v", err)
},
}

p.InstallFormatFlag(cmd)
cmd.Flags().BoolVarP(&detailed, "detailed", "d", false, "Print per-partition information for topics")
cmd.Flags().BoolVarP(&internal, "internal", "i", false, "Print internal topics")
cmd.Flags().BoolVarP(&re, "regex", "r", false, "Parse topics as regex; list any topic that matches any input topic expression")
return cmd
}

type summarizedList struct {
Name string `json:"name" yaml:"name"`
Partitions int `json:"partitions" yaml:"partitions"`
Replicas int `json:"replicas" yaml:"replicas"`
}

func summarizedListView(internal bool, topics kadm.TopicDetails) (resp []summarizedList) {
resp = make([]summarizedList, 0, len(topics))
for _, topic := range topics.Sorted() {
if !internal && topic.IsInternal {
continue
}
s := summarizedList{
Name: topic.Topic,
Partitions: len(topic.Partitions),
Replicas: topic.Partitions.NumReplicas(),
}
resp = append(resp, s)
}
return
}

func printSummarizedListView(f config.OutFormatter, topics []summarizedList, w io.Writer) {
if isText, _, t, err := f.Format(topics); !isText {
out.MaybeDie(err, "unable to print in the requested format %q: %v", f.Kind, err)
fmt.Fprintln(w, t)
return
}

tw := out.NewTableTo(w, "NAME", "PARTITIONS", "REPLICAS")
defer tw.Flush()
for _, topic := range topics {
tw.Print(topic.Name, topic.Partitions, topic.Replicas)
}
}

type detailedListTopic struct {
Name string `json:"name" yaml:"name"`
Partitions int `json:"partitions" yaml:"partitions"`
Replicas int `json:"replicas" yaml:"replicas"`
PartitionList []detailedListPartition `json:"partition_list" yaml:"partition_list"`
isOffline bool
isInternal bool
isEpoch bool
isLoadErr bool
}

type detailedListPartition struct {
Partition int32 `json:"partition" yaml:"partition"`
Leader int32 `json:"leader" yaml:"leader"`
Epoch int32 `json:"epoch,omitempty" yaml:"epoch"`
Replicas []int32 `json:"replicas" yaml:"replicas"`
OfflineReplicas []int32 `json:"offline_replicas" yaml:"offline_replicas"`
LoadError string `json:"load_error" yaml:"load_error"`
}

func detailedListView(internal bool, topics kadm.TopicDetails) (resp []detailedListTopic) {
resp = make([]detailedListTopic, 0, len(topics))
for _, topic := range topics.Sorted() {
if !internal && topic.IsInternal {
continue
}
d := detailedListTopic{
Name: topic.Topic,
Partitions: len(topic.Partitions),
isInternal: topic.IsInternal,
}
if len(topic.Partitions) > 0 {
d.Replicas = topic.Partitions.NumReplicas()
d.PartitionList = make([]detailedListPartition, len(topic.Partitions))
}

for k, p := range topic.Partitions.Sorted() {
d.PartitionList[k].Partition = p.Partition
d.PartitionList[k].Replicas = int32s(p.Replicas).sort()
d.PartitionList[k].Leader = p.Leader
if p.LeaderEpoch != -1 {
d.isEpoch = true
d.PartitionList[k].Epoch = p.LeaderEpoch
}
if len(p.OfflineReplicas) > 0 {
d.isOffline = true
d.PartitionList[k].OfflineReplicas = int32s(p.OfflineReplicas).sort()
}
if p.Err != nil {
d.isLoadErr = true
d.PartitionList[k].LoadError = p.Err.Error()
}
}
resp = append(resp, d)
}
return
}

func printDetailedListView(f config.OutFormatter, topics []detailedListTopic, w io.Writer) {
if isText, _, t, err := f.Format(topics); !isText {
out.MaybeDie(err, "unable to print in the requested format %q: %v", f.Kind, err)
fmt.Fprintln(w, t)
return
}

for i, topic := range topics {
var topicName string
if topic.isInternal {
topicName = fmt.Sprintf("%s (internal)", topic.Name)
} else {
topicName = topic.Name
}
fmt.Fprintf(w, "%s, %d partitions, %d replicas\n", topicName, topic.Partitions, topic.Replicas)
headers := []string{"", "partition", "leader"}
if topic.isEpoch {
headers = append(headers, "epoch")
}
headers = append(headers, "replicas")
if topic.isOffline {
headers = append(headers, "offline_replicas")
}
if topic.isLoadErr {
headers = append(headers, "load_error")
}
printPartitionTable(w, headers, topic, topic.PartitionList)
if i != len(topics)-1 {
fmt.Fprintf(w, "\n")
}
}
}

func printPartitionTable(w io.Writer, headers []string, topic detailedListTopic, partitions []detailedListPartition) {
tw := out.NewTableTo(w, headers...)
defer tw.Flush()
for _, p := range partitions {
part := []any{"", p.Partition, p.Leader}
if topic.isEpoch {
part = append(part, p.Epoch)
}
part = append(part, p.Replicas)
if topic.isOffline {
part = append(part, p.OfflineReplicas)
}
if topic.isLoadErr {
part = append(part, p.LoadError)
}
tw.Print(part...)
}
}
Loading

0 comments on commit 919837d

Please sign in to comment.