Skip to content

Commit

Permalink
Add kubectl ray cluster log command (#2296)
Browse files Browse the repository at this point in the history
  • Loading branch information
chiayi authored Sep 10, 2024
1 parent fb58429 commit 3e68606
Show file tree
Hide file tree
Showing 4 changed files with 839 additions and 21 deletions.
53 changes: 32 additions & 21 deletions kubectl-plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,35 @@ Kubectl plugin/extension for Kuberay CLI that provides the ability to manage ray
Aliases:
get, list

Flags:
-A, --all-namespaces If present, list the requested clusters across all namespaces. Namespace in current context is ignored even if specified with --namespace.
--as string Username to impersonate for the operation. User could be a regular user or a service account in a namespace.
--as-group stringArray Group to impersonate for the operation, this flag can be repeated to specify multiple groups.
--as-uid string UID to impersonate for the operation.
--cache-dir string Default cache directory
--certificate-authority string Path to a cert file for the certificate authority
--client-certificate string Path to a client certificate file for TLS
--client-key string Path to a client key file for TLS
--cluster string The name of the kubeconfig cluster to use
--context string The name of the kubeconfig context to use
--disable-compression If true, opt-out of response compression for all requests to the server
-h, --help help for get
--insecure-skip-tls-verify If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure
--kubeconfig string Path to the kubeconfig file to use for CLI requests.
-n, --namespace string If present, the namespace scope for this CLI request
--request-timeout string The length of time to wait before giving up on a single server request. Non-zero values should contain a corresponding time unit (e.g. 1s, 2m, 3h). A value of zero means don't timeout requests. (default "0")
-s, --server string The address and port of the Kubernetes API server
--tls-server-name string Server name to use for server certificate validation. If it is not provided, the hostname used to contact the server is used
--token string Bearer token for authentication to the API server
--user string The name of the kubeconfig user to use
Description:
Retrieves the ray cluster information.

```
$ kubectl ray cluster get
NAME NAMESPACE DESIRED WORKERS AVAILABLE WORKERS CPUS GPUS TPUS MEMORY AGE
random-kuberay-cluster default 1 1 5 1 0 24Gi 13d
```

### Retrieve Ray Cluster Head Logs

Usage:
log (RAY_CLUSTER_NAME) [--out-dir directory] [--node-type all|head|worker]
**Currently only `head` is supported**

Aliases:
log, logs

Description:
Retrieves ray cluster head pod logs and all the logs under `/tmp/ray/session_latest/logs/`

```
$ kubectl ray cluster logs --out-dir temp-dir
...
$ ls temp-dir/
stable-diffusion-cluster-head-hqcxt
$ ls temp-dir/stable-diffusion-cluster-head-hqcxt
agent-424238335.err dashboard_agent.log gcs_server.err monitor.err old raylet.out stdout.log
agent-424238335.out debug_state.txt gcs_server.out monitor.log ray_client_server.err runtime_env_agent.err
dashboard.err debug_state_gcs.txt log_monitor.err monitor.out ray_client_server.out runtime_env_agent.log
dashboard.log events log_monitor.log nsight raylet.err runtime_env_agent.out
```
301 changes: 301 additions & 0 deletions kubectl-plugin/pkg/cmd/log/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
package log

import (
"archive/tar"
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"math"
"net/url"
"os"
"path"
"path/filepath"

"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)

const filePathInPod = "/tmp/ray/session_latest/logs/"

type ClusterLogOptions struct {
configFlags *genericclioptions.ConfigFlags
ioStreams *genericclioptions.IOStreams
Executor RemoteExecutor
outputDir string
nodeType string
args []string
}

func NewClusterLogOptions(streams genericclioptions.IOStreams) *ClusterLogOptions {
return &ClusterLogOptions{
configFlags: genericclioptions.NewConfigFlags(true),
ioStreams: &streams,
Executor: &DefaultRemoteExecutor{},
}
}

func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command {
options := NewClusterLogOptions(streams)
// Initialize the factory for later use with the current config flag
cmdFactory := cmdutil.NewFactory(options.configFlags)

cmd := &cobra.Command{
Use: "log (RAY_CLUSTER_NAME) [--out-dir DIR_PATH] [--node-type all|head|worker]",
Short: "Get ray cluster log",
Aliases: []string{"logs"},
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Complete(args); err != nil {
return err
}
if err := options.Validate(); err != nil {
return err
}
return options.Run(cmd.Context(), cmdFactory)
},
}
cmd.Flags().StringVar(&options.outputDir, "out-dir", options.outputDir, "File Directory PATH of where to download the file logs to.")
cmd.Flags().StringVar(&options.nodeType, "node-type", options.nodeType, "Type of Ray node to download the files for.")
options.configFlags.AddFlags(cmd.Flags())
return cmd
}

func (options *ClusterLogOptions) Complete(args []string) error {
options.args = args

if options.nodeType == "" {
options.nodeType = "head"
}

return nil
}

func (options *ClusterLogOptions) Validate() error {
// Overrides and binds the kube config then retrieves the merged result
config, err := options.configFlags.ToRawKubeConfigLoader().RawConfig()
if err != nil {
return fmt.Errorf("Error retrieving raw config: %w", err)
}
if len(config.CurrentContext) == 0 {
return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context <context>")
}

// Command must have ray cluster name
if len(options.args) != 1 {
return fmt.Errorf("must have at only one argument")
} else if options.outputDir == "" {
fmt.Fprintln(options.ioStreams.Out, "No output directory specified, creating dir under current directory using cluster name.")
options.outputDir = options.args[0]
err := os.MkdirAll(options.outputDir, 0o755)
if err != nil {
return fmt.Errorf("could not create directory with cluster name %s: %w", options.outputDir, err)
}
}

switch options.nodeType {
case "all":
return fmt.Errorf("node type `all` is currently not supported")
case "head":
break
case "worker":
return fmt.Errorf("node type `worker` is currently not supported")
default:
return fmt.Errorf("unknown node type `%s`", options.nodeType)
}

info, err := os.Stat(options.outputDir)
if os.IsNotExist(err) {
return fmt.Errorf("Directory does not exist. Failed with: %w", err)
} else if err != nil {
return fmt.Errorf("Error occurred will checking directory: %w", err)
} else if !info.IsDir() {
return fmt.Errorf("Path is Not a directory. Please input a directory and try again")
}

return nil
}

func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Factory) error {
kubeClientSet, err := factory.KubernetesClientSet()
if err != nil {
return fmt.Errorf("failed to retrieve kubernetes client set: %w", err)
}

var listopts v1.ListOptions
if options.nodeType == "head" {
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/group=headgroup, ray.io/cluster=%s", options.args[0]),
}
}

// Get list of nodes that are considered ray heads
rayHeads, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts)
if err != nil {
return fmt.Errorf("failed to retrieve head node for cluster %s: %w", options.args[0], err)
}

// Get a list of logs of the ray heads.
var logList []*bytes.Buffer
for _, rayHead := range rayHeads.Items {
request := kubeClientSet.CoreV1().Pods(rayHead.Namespace).GetLogs(rayHead.Name, &corev1.PodLogOptions{})

podLogs, err := request.Stream(ctx)
if err != nil {
return fmt.Errorf("Error retrieving log for kuberay-head %s: %w", rayHead.Name, err)
}
defer podLogs.Close()

// Get current logs:
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return fmt.Errorf("Failed to get read current logs for kuberay-head %s: %w", rayHead.Name, err)
}

logList = append(logList, buf)
}

// Pod file name format is name of the ray head
for ind, logList := range logList {
curFilePath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name, "stdout.log")
dirPath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name)
err := os.MkdirAll(dirPath, 0o755)
if err != nil {
return fmt.Errorf("failed to create directory within path %s: %w", dirPath, err)
}
file, err := os.OpenFile(curFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644)
if err != nil {
return fmt.Errorf("failed to create/open file for kuberay-head with path: %s: %w", curFilePath, err)
}
defer file.Close()

_, err = logList.WriteTo(file)
if err != nil {
return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayHeads.Items[ind].Name, err)
}

req := kubeClientSet.CoreV1().RESTClient().
Get().
Namespace(rayHeads.Items[ind].Namespace).
Resource("pods").
Name(rayHeads.Items[ind].Name).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: []string{"tar", "--warning=no-file-changed", "-cf", "-", "-C", filePathInPod, "."},
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, clientgoscheme.ParameterCodec)

restconfig, err := factory.ToRESTConfig()
if err != nil {
return fmt.Errorf("failed to get restconfig: %w", err)
}

exec, err := options.Executor.CreateExecutor(restconfig, req.URL())
if err != nil {
return fmt.Errorf("failed to create executor with error: %w", err)
}

err = options.downloadRayLogFiles(ctx, exec, rayHeads.Items[ind])
if err != nil {
return fmt.Errorf("failed to download ray head log files with error: %w", err)
}
}
return nil
}

// RemoteExecutor creates the executor for executing exec on the pod - provided for testing purposes
type RemoteExecutor interface {
CreateExecutor(restConfig *rest.Config, url *url.URL) (remotecommand.Executor, error)
}

type DefaultRemoteExecutor struct{}

// CreateExecutor returns the executor created by NewSPDYExecutor
func (dre *DefaultRemoteExecutor) CreateExecutor(restConfig *rest.Config, url *url.URL) (remotecommand.Executor, error) {
return remotecommand.NewSPDYExecutor(restConfig, "POST", url)
}

// downloadRayLogFiles will use to the executor and retrieve the logs file from the inputted ray head
func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec remotecommand.Executor, rayhead corev1.Pod) error {
outreader, outStream := io.Pipe()
go func() {
defer outStream.Close()
err := exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: options.ioStreams.In,
Stdout: outStream,
Stderr: options.ioStreams.ErrOut,
Tty: false,
})
if err != nil {
log.Fatalf("Error occurred while calling remote command: %v", err)
}
}()

// Goes through the tar and create/copy them one by one into the destination dir
tarReader := tar.NewReader(outreader)
header, err := tarReader.Next()
if err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("error will extracting head tar file for ray head %s: %w", rayhead.Name, err)
}
for !errors.Is(err, io.EOF) {
fmt.Printf("Downloading file %s for Ray Head %s\n", header.Name, rayhead.Name)
if err != nil {
return fmt.Errorf("Error reading tar archive: %w", err)
}

// Construct the full local path and a directory for the tmp file logs
localFilePath := filepath.Join(path.Clean(options.outputDir), path.Clean(rayhead.Name), path.Clean(header.Name))

switch header.Typeflag {
case tar.TypeDir:
if err := os.MkdirAll(localFilePath, 0o755); err != nil {
return fmt.Errorf("Error creating directory: %w", err)
}
case tar.TypeReg:
// Check for overflow: G115
if header.Mode < 0 || header.Mode > math.MaxUint32 {
fmt.Fprintf(options.ioStreams.Out, "file mode out side of accceptable value %d skipping file", header.Mode)
}
// Create file and write contents
outFile, err := os.OpenFile(localFilePath, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode)) //nolint:gosec // lint failing due to file mode conversion from uint64 to int32, checked above
if err != nil {
return fmt.Errorf("Error creating file: %w", err)
}
defer outFile.Close()
// This is to limit the copy size for a decompression bomb, currently set arbitrarily
for {
n, err := io.CopyN(outFile, tarReader, 1000000)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("failed while writing to file: %w", err)
}
if n == 0 {
break
}
}
default:
fmt.Printf("Ignoring unsupported file type: %b", header.Typeflag)
}

header, err = tarReader.Next()
if header == nil && err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("error while extracting tar file with error: %w", err)
}
}

return nil
}
Loading

0 comments on commit 3e68606

Please sign in to comment.