Skip to content

Commit

Permalink
Merge pull request #24134 from r-vasquez/rpk-mount-cloud
Browse files Browse the repository at this point in the history
rpk: add Redpanda Cloud support to `rpk cluster storage` mount/unmount commands
  • Loading branch information
r-vasquez authored Nov 18, 2024
2 parents 1102318 + 8650b26 commit 17cd70d
Show file tree
Hide file tree
Showing 18 changed files with 341 additions and 111 deletions.
4 changes: 2 additions & 2 deletions src/go/rpk/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ endif

GOOS ?= $(shell go env GOOS)
GOARCH ?= $(shell go env GOARCH)
GOPATH ?= $(shell go env GOPATH)
OUTDIR := $(GOOS)-$(GOARCH)

REV := $(shell git rev-parse --short HEAD)
Expand Down Expand Up @@ -64,8 +65,7 @@ run_gofumpt:
$(GOFUMPT_OS_CMD)

install_golangci_lint:
@echo "installing golangci-lint"
@$(GOCMD) install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
@which $(GOLANGCILINTCMD) || (if [ $$? -eq 1 ]; then echo "golangci-lint not found, installing..."; curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin v1.62.0; fi)

run_linter:
$(GOLANGCILINTCMD) run
Expand Down
10 changes: 5 additions & 5 deletions src/go/rpk/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
buf.build/gen/go/redpandadata/cloud/connectrpc/go v1.17.0-20241024195046-353ea4645e3d.1
buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.35.1-20241024195046-353ea4645e3d.1
buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.35.1-20240917150400-3f349e63f44a.1
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20240823133854-b83c57715214.1
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20240823133854-b83c57715214.1
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20241112225414-3759fedba3f3.1
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20241112225414-3759fedba3f3.1
cloud.google.com/go/compute/metadata v0.5.2
connectrpc.com/connect v1.17.0
github.com/AlecAivazis/survey/v2 v2.3.7
Expand Down Expand Up @@ -140,9 +140,9 @@ require (
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.7.0 // indirect
golang.org/x/tools v0.26.0 // indirect
google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241021214115-324edc3d5d38 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gotest.tools/v3 v3.0.3 // indirect
Expand Down
20 changes: 10 additions & 10 deletions src/go/rpk/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.35.1-20241024195046-35
buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.35.1-20241024195046-353ea4645e3d.1/go.mod h1:KYw4KQVGbgMkHR4br5uQjNFwT3b5TML5Ll3SLAsh4Ho=
buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.35.1-20240917150400-3f349e63f44a.1 h1:H/JebbbR+Kd0vXXY4cyqUZOmmXw0YUvQjjmvHBnKSpw=
buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.35.1-20240917150400-3f349e63f44a.1/go.mod h1:AD5cSkm/Wy/YTKR9VKtnKAoYxbLZSh/pYC8g9VCeMJA=
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20240823133854-b83c57715214.1 h1:sLbN1qppoFCmfp8e/h8z1y7TyFqQYWJlyJpaybQGsmw=
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20240823133854-b83c57715214.1/go.mod h1:J9MSgmioQQG//z0cS0pEkkZn3Q0DxRT8UvLk1dAYNhM=
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20240823133854-b83c57715214.1 h1:Ytc3jVPUHAA6Lep8ptfesx3zsWeroHVHbloNP1q6HXc=
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20240823133854-b83c57715214.1/go.mod h1:5WJc8OWoe83gSQv52+xclVntrgm2tixef9S61wyQAQA=
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20241112225414-3759fedba3f3.1 h1:4cD7CRcJLTjA45y5xoL5qPyqiV0pTyNKgJ9jMg8c2so=
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20241112225414-3759fedba3f3.1/go.mod h1:lAVv5Nv6SZUV8+UFtUfFF2mMS4WlDp1CsOSPtNgrjPE=
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20241112225414-3759fedba3f3.1 h1:FoxR0Huu43isy8t/JcQkeORWN6KYb0SDoCKLrpU529E=
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20241112225414-3759fedba3f3.1/go.mod h1:+/pdQipFpdMztKw+xaZFHGUrwMfHLu1qyKOGpTsWFeA=
cloud.google.com/go/compute/metadata v0.5.2 h1:UxK4uu/Tn+I3p2dYWTfiX4wva7aYlKixAHn3fyqngqo=
cloud.google.com/go/compute/metadata v0.5.2/go.mod h1:C66sj2AluDcIqakBq/M8lw8/ybHgOZqin2obFxa/E5k=
connectrpc.com/connect v1.17.0 h1:W0ZqMhtVzn9Zhn2yATuUokDLO5N+gIuBWMOnsQrfmZk=
Expand Down Expand Up @@ -368,12 +368,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38 h1:Q3nlH8iSQSRUwOskjbcSMcF2jiYMNiQYZ0c2KEJLKKU=
google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38/go.mod h1:xBI+tzfqGGN2JBeSebfKXFSdBpWVQ7sLW40PTupVRm4=
google.golang.org/genproto/googleapis/api v0.0.0-20241021214115-324edc3d5d38 h1:2oV8dfuIkM1Ti7DwXc0BJfnwr9csz4TDXI9EmiI+Rbw=
google.golang.org/genproto/googleapis/api v0.0.0-20241021214115-324edc3d5d38/go.mod h1:vuAjtvlwkDKF6L1GQ0SokiRLCGFfeBUXWr/aFFkHACc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28 h1:KJjNNclfpIkVqrZlTWcgOOaVQ00LdBnoEaRfkUx760s=
google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:mt9/MofW7AWQ+Gy179ChOnvmJatV8YHUmrcedo9CIFI=
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1:M0KvPgPmDZHPlbRbaNU1APr28TvwvvdUPlSv7PUvy8g=
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
Expand Down
3 changes: 3 additions & 0 deletions src/go/rpk/pkg/cli/cluster/storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ go_library(
"//src/go/rpk/pkg/cli/cluster/storage/recovery",
"//src/go/rpk/pkg/config",
"//src/go/rpk/pkg/out",
"//src/go/rpk/pkg/publicapi",
"@build_buf_gen_go_redpandadata_dataplane_protocolbuffers_go//redpanda/api/dataplane/v1alpha2",
"@com_connectrpc_connect//:connect",
"@com_github_redpanda_data_common_go_rpadmin//:rpadmin",
"@com_github_spf13_afero//:afero",
"@com_github_spf13_cobra//:cobra",
Expand Down
32 changes: 25 additions & 7 deletions src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"fmt"
"strconv"

dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2"
"connectrpc.com/connect"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand All @@ -36,18 +38,34 @@ Cancel a mount/unmount operation
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, from []string) {
pf, err := p.LoadVirtualProfile(fs)
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
config.CheckExitServerlessAdmin(p)

migrationID, err := strconv.Atoi(from[0])
out.MaybeDie(err, "invalid migration ID: %v", err)

err = adm.ExecuteMigration(cmd.Context(), migrationID, rpadmin.MigrationActionCancel)
out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err)
fmt.Printf("Successfully canceled the operation with ID %v", migrationID)
if p.FromCloud {
cl, err := createDataplaneClient(p)
out.MaybeDieErr(err)

req := connect.NewRequest(
&dataplanev1alpha2.UpdateMountTaskRequest{
Id: int32(migrationID),
Action: dataplanev1alpha2.UpdateMountTaskRequest_ACTION_CANCEL,
},
)
_, err = cl.CloudStorage.UpdateMountTask(cmd.Context(), req)
out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err)
} else {
adm, err := adminapi.NewClient(cmd.Context(), fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

err = adm.ExecuteMigration(cmd.Context(), migrationID, rpadmin.MigrationActionCancel)
out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err)
}

fmt.Printf("Successfully canceled the operation with ID %v\n", migrationID)
},
}
return cmd
Expand Down
84 changes: 73 additions & 11 deletions src/go/rpk/pkg/cli/cluster/storage/list-mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"os"
"strings"

dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2"
"connectrpc.com/connect"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand Down Expand Up @@ -52,19 +54,33 @@ Use filter to list only migrations in a specific state
out.Exit(h)
}

pf, err := p.LoadVirtualProfile(fs)
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
config.CheckExitServerlessAdmin(p)

migrations, err := adm.ListMigrations(cmd.Context())
out.MaybeDie(err, "unable to list migrations: %v", err)
printDetailedListMount(p.Formatter, filterOptFromString(filter), rpadminMigrationStateToMigrationState(migrations), os.Stdout)
var migrations []rpadmin.MigrationState
if p.FromCloud {
cl, err := createDataplaneClient(p)
out.MaybeDieErr(err)

resp, err := cl.CloudStorage.ListMountTasks(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListMountTasksRequest{}))
out.MaybeDie(err, "unable to list mount/unmount operations: %v", err)

if resp != nil {
migrations = listMountTaskToAdminMigrationState(resp.Msg)
}
} else {
adm, err := adminapi.NewClient(cmd.Context(), fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

migrations, err = adm.ListMigrations(cmd.Context())
out.MaybeDie(err, "unable to list migrations: %v", err)
}
printDetailedListMount(f, filterOptFromString(filter), rpadminMigrationStateToMigrationState(migrations), os.Stdout)
},
}
p.InstallFormatFlag(cmd)
cmd.Flags().StringVarP(&filter, "filter", "f", "", "Filter the list of migrations by state. Only valid for text")
cmd.Flags().StringVarP(&filter, "filter", "f", "all", "Filter the list of migrations by state. Only valid for text")
return cmd
}

Expand Down Expand Up @@ -96,7 +112,7 @@ func (f filterOpts) String() string {
}

func filterOptFromString(s string) filterOpts {
switch s {
switch strings.ToLower(s) {
case "planned":
return FilterOptsPlanned
case "prepared":
Expand Down Expand Up @@ -150,9 +166,11 @@ func rpadminTopicsToStringSlice(in []rpadmin.NamespacedOrInboundTopic) (resp []s
for _, entry := range in {
if entry.Namespace != nil {
resp = append(resp, fmt.Sprintf("%s/%s", *entry.Namespace, entry.Topic))
continue
} else if entry.SourceTopicReference.Topic != "" {
resp = append(resp, entry.SourceTopicReference.Topic)
} else {
resp = append(resp, entry.Topic)
}
resp = append(resp, entry.Topic)
}
return
}
Expand All @@ -163,3 +181,47 @@ type migrationState struct {
MigrationType string `json:"type" yaml:"type"`
Topics []string `json:"topics" yaml:"topics"`
}

func listMountTaskToAdminMigrationState(resp *dataplanev1alpha2.ListMountTasksResponse) []rpadmin.MigrationState {
var migrations []rpadmin.MigrationState
if resp != nil {
for _, task := range resp.Tasks {
if task != nil {
migrations = append(migrations, rpadmin.MigrationState{
ID: int(task.Id),
State: strings.TrimPrefix(task.State.String(), "STATE_"),
Migration: rpadmin.Migration{
MigrationType: task.Type.String(),
Topics: mountTaskTopicsToNamespacedOrInboundTopics(task.Topics, task.Type),
},
})
}
}
}
return migrations
}

// mountTaskTopicsToNamespacedOrInboundTopics converts the dataplane's
// mountTaskTopics to the rpadmin's equivalent.
func mountTaskTopicsToNamespacedOrInboundTopics(taskTopics []*dataplanev1alpha2.MountTask_Topic, taskType dataplanev1alpha2.MountTask_Type) []rpadmin.NamespacedOrInboundTopic {
var topics []rpadmin.NamespacedOrInboundTopic
for _, topic := range taskTopics {
// Inbound == Mount.
if taskType == dataplanev1alpha2.MountTask_TYPE_MOUNT {
topics = append(topics, rpadmin.NamespacedOrInboundTopic{
InboundTopic: rpadmin.InboundTopic{
SourceTopicReference: rpadmin.NamespacedTopic{
Topic: topic.SourceTopicReference, // The topic of the bucket you are mounting.
},
},
})
} else {
topics = append(topics, rpadmin.NamespacedOrInboundTopic{
NamespacedTopic: rpadmin.NamespacedTopic{
Topic: topic.TopicReference, // The topic in the cluster you are un-mounting.
},
})
}
}
return topics
}
44 changes: 37 additions & 7 deletions src/go/rpk/pkg/cli/cluster/storage/list-mountable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io"
"os"

dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2"
"connectrpc.com/connect"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand Down Expand Up @@ -33,15 +35,30 @@ List all mountable topics:
out.Exit(h)
}

pf, err := p.LoadVirtualProfile(fs)
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
config.CheckExitServerlessAdmin(p)

response, err := adm.ListMountableTopics(cmd.Context())
out.MaybeDie(err, "unable to list mountable topics: %v", err)
printDetailedListMountable(p.Formatter, rpadminMountableTopicsToMountableTopicState(response.Topics), os.Stdout)
var mountableTopics []rpadmin.MountableTopic
if p.FromCloud {
cl, err := createDataplaneClient(p)
out.MaybeDieErr(err)

resp, err := cl.CloudStorage.ListMountableTopics(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListMountableTopicsRequest{}))
out.MaybeDie(err, "unable to list mountable topics: %v", err)
if resp != nil {
mountableTopics = dataplaneToAdminMountableTopics(resp.Msg)
}
} else {
adm, err := adminapi.NewClient(cmd.Context(), fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

response, err := adm.ListMountableTopics(cmd.Context())
out.MaybeDie(err, "unable to list mountable topics: %v", err)
mountableTopics = response.Topics
}

printDetailedListMountable(f, rpadminMountableTopicsToMountableTopicState(mountableTopics), os.Stdout)
},
}
p.InstallFormatFlag(cmd)
Expand Down Expand Up @@ -83,3 +100,16 @@ func rpadminMountableTopicsToMountableTopicState(in []rpadmin.MountableTopic) []
}
return resp
}

func dataplaneToAdminMountableTopics(resp *dataplanev1alpha2.ListMountableTopicsResponse) []rpadmin.MountableTopic {
var topics []rpadmin.MountableTopic
if resp != nil {
for _, topic := range resp.Topics {
topics = append(topics, rpadmin.MountableTopic{
TopicLocation: topic.TopicLocation,
Topic: topic.Name,
})
}
}
return topics
}
Loading

0 comments on commit 17cd70d

Please sign in to comment.