Skip to content

Commit

Permalink
Add kubectl ray cluster log command
Browse files Browse the repository at this point in the history
  • Loading branch information
chiayi committed Aug 9, 2024
1 parent 6c1c16e commit dc38dc8
Show file tree
Hide file tree
Showing 4 changed files with 473 additions and 0 deletions.
31 changes: 31 additions & 0 deletions kubectl-plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,34 @@ Kubectl plugin/extension for Kuberay CLI that provides the ability to manage ray
--tls-server-name string Server name to use for server certificate validation. If it is not provided, the hostname used to contact the server is used
--token string Bearer token for authentication to the API server
--user string The name of the kubeconfig user to use

### Retrieve Ray Cluster Head Logs

Usage:
ray cluster log (CLUSTER_NAME) [flags]

Aliases:
log, logs

Flags:
--as string Username to impersonate for the operation. User could be a regular user or a service account in a namespace.
--as-group stringArray Group to impersonate for the operation, this flag can be repeated to specify multiple groups.
--as-uid string UID to impersonate for the operation.
--cache-dir string Default cache directory (default "/Users/aaronliang/.kube/cache")
--certificate-authority string Path to a cert file for the certificate authority
--client-certificate string Path to a client certificate file for TLS
--client-key string Path to a client key file for TLS
--cluster string The name of the kubeconfig cluster to use
--context string The name of the kubeconfig context to use
--disable-compression If true, opt-out of response compression for all requests to the server
-h, --help help for log
--insecure-skip-tls-verify If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure
--kubeconfig string Path to the kubeconfig file to use for CLI requests.
-n, --namespace string If present, the namespace scope for this CLI request
--out-dir string File Directory PATH of where to download the file logs to.
--request-timeout string The length of time to wait before giving up on a single server request. Non-zero values should contain a corresponding time unit (e.g. 1s, 2m, 3h). A value of zero means don't timeout requests. (default "0")
-s, --server string The address and port of the Kubernetes API server
--tls-server-name string Server name to use for server certificate validation. If it is not provided, the hostname used to contact the server is used
--token string Bearer token for authentication to the API server
--user string The name of the kubeconfig user to use
aaronliang-macbookpro:kubectl-plugin aaronliang$ kubectl ray cluster log raycluster-sample
1 change: 1 addition & 0 deletions kubectl-plugin/pkg/cmd/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ func NewClusterCommand(streams genericclioptions.IOStreams) *cobra.Command {
}

cmd.AddCommand(NewClusterGetCommand(streams))
cmd.AddCommand(NewClusterLogCommand(streams))
return cmd
}
161 changes: 161 additions & 0 deletions kubectl-plugin/pkg/cmd/cluster/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package cluster

import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"

"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)

type ClusterLogOptions struct {
configFlags *genericclioptions.ConfigFlags
ioStreams *genericclioptions.IOStreams
outputDir string
args []string
downloadLog bool
}

func NewClusterLogOptions(streams genericclioptions.IOStreams) *ClusterLogOptions {
return &ClusterLogOptions{
configFlags: genericclioptions.NewConfigFlags(true),
ioStreams: &streams,
}
}

func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command {
options := NewClusterLogOptions(streams)
// Initialize the factory for later use with the current config flag
cmdFactory := cmdutil.NewFactory(options.configFlags)

cmd := &cobra.Command{
Use: "log (CLUSTER_NAME)",
Short: "Get cluster log",
Aliases: []string{"logs"},
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Complete(args); err != nil {
return err
}
if err := options.Validate(); err != nil {
return err
}
return options.Run(cmd.Context(), cmdFactory)
},
}
cmd.Flags().StringVar(&options.outputDir, "out-dir", options.outputDir, "File Directory PATH of where to download the file logs to.")
options.configFlags.AddFlags(cmd.Flags())
return cmd
}

func (options *ClusterLogOptions) Complete(args []string) error {
if options.outputDir != "" {
options.downloadLog = true
}

options.args = args
return nil
}

func (options *ClusterLogOptions) Validate() error {
// Overrides and binds the kube config then retrieves the merged result
config, err := options.configFlags.ToRawKubeConfigLoader().RawConfig()
if err != nil {
return fmt.Errorf("Error retrieving raw config: %w", err)
}
if len(config.CurrentContext) == 0 {
return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context <context>")
}
// If there is a output directory to download to, we validate the directory
if options.downloadLog {
info, err := os.Stat(options.outputDir)
if os.IsNotExist(err) {
return fmt.Errorf("Directory does not exist. Failed with %w", err)
} else if err != nil {
return fmt.Errorf("Error occurred will checking directory %w", err)
} else if !info.IsDir() {
return fmt.Errorf("Path is Not a directory. Please input a directory and try again")
}
}
// Command must have ray cluster name
if len(options.args) > 1 {
return fmt.Errorf("must have at most one argument")
}
return nil
}

func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Factory) error {
kubeClientSet, err := factory.KubernetesClientSet()
if err != nil {
return fmt.Errorf("failed to retrieve kubernetes client set: %w", err)
}

var listopts v1.ListOptions
if len(options.args) == 1 {
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/group=headgroup, ray.io/cluster=%s", options.args[0]),
}
} else {
listopts = v1.ListOptions{
LabelSelector: "ray.io/group=headgroup",
}
}

// Get list of nodes that are considered ray heads
rayHeads, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts)
if err != nil {
return fmt.Errorf("failed to retrieve head node for cluster %s: %w", options.args[0], err)
}

// get a list of logs of the ray heads.
var logList []*bytes.Buffer
for _, rayHead := range rayHeads.Items {
request := kubeClientSet.CoreV1().Pods(rayHead.Namespace).GetLogs(rayHead.Name, &corev1.PodLogOptions{})

podLogs, err := request.Stream(ctx)
if err != nil {
return fmt.Errorf("Error retrieving log for kuberay-head %s: %w", rayHead.Name, err)
}
defer podLogs.Close()

// Get current logs:
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return fmt.Errorf("Failed to get read current logs for kuberay-head %s: %w", rayHead.Name, err)
}

logList = append(logList, buf)
}

// depending on the if there is a out-dir or not, it will either print out the log or download the logs
// File name format is name of the ray head
if options.downloadLog {
for ind, logList := range logList {
curFilePath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name)
file, err := os.OpenFile(curFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644)
if err != nil {
return fmt.Errorf("failed to create/open file for kuberay-head with path: %s: %w", rayHeads.Items[ind].Name, err)
}
defer file.Close()

_, err = logList.WriteTo(file)
if err != nil {
return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayHeads.Items[ind].Name, err)
}
}
} else {
for ind, logList := range logList {
fmt.Fprintf(options.ioStreams.Out, "Head Name: %s\n", rayHeads.Items[ind].Name)
fmt.Fprint(options.ioStreams.Out, logList.String())
}
}
return nil
}
Loading

0 comments on commit dc38dc8

Please sign in to comment.