diff --git a/kubectl-plugin/README.md b/kubectl-plugin/README.md index ca606d3db3c..7f3a23bc413 100644 --- a/kubectl-plugin/README.md +++ b/kubectl-plugin/README.md @@ -23,24 +23,35 @@ Kubectl plugin/extension for Kuberay CLI that provides the ability to manage ray Aliases: get, list - Flags: - -A, --all-namespaces If present, list the requested clusters across all namespaces. Namespace in current context is ignored even if specified with --namespace. - --as string Username to impersonate for the operation. User could be a regular user or a service account in a namespace. - --as-group stringArray Group to impersonate for the operation, this flag can be repeated to specify multiple groups. - --as-uid string UID to impersonate for the operation. - --cache-dir string Default cache directory - --certificate-authority string Path to a cert file for the certificate authority - --client-certificate string Path to a client certificate file for TLS - --client-key string Path to a client key file for TLS - --cluster string The name of the kubeconfig cluster to use - --context string The name of the kubeconfig context to use - --disable-compression If true, opt-out of response compression for all requests to the server - -h, --help help for get - --insecure-skip-tls-verify If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure - --kubeconfig string Path to the kubeconfig file to use for CLI requests. - -n, --namespace string If present, the namespace scope for this CLI request - --request-timeout string The length of time to wait before giving up on a single server request. Non-zero values should contain a corresponding time unit (e.g. 1s, 2m, 3h). A value of zero means don't timeout requests. (default "0") - -s, --server string The address and port of the Kubernetes API server - --tls-server-name string Server name to use for server certificate validation. If it is not provided, the hostname used to contact the server is used - --token string Bearer token for authentication to the API server - --user string The name of the kubeconfig user to use + Description: + Retrieves the ray cluster information. + + ``` + $ kubectl ray cluster get + NAME NAMESPACE DESIRED WORKERS AVAILABLE WORKERS CPUS GPUS TPUS MEMORY AGE + random-kuberay-cluster default 1 1 5 1 0 24Gi 13d + ``` + +### Retrieve Ray Cluster Head Logs + + Usage: + log (RAY_CLUSTER_NAME) [--out-dir directory] [--node-type all|head|worker] + **Currently only `head` is supported** + + Aliases: + log, logs + + Description: + Retrieves ray cluster head pod logs and all the logs under `/tmp/ray/session_latest/logs/` + + ``` + $ kubectl ray cluster logs --out-dir temp-dir + ... + $ ls temp-dir/ + stable-diffusion-cluster-head-hqcxt + $ ls temp-dir/stable-diffusion-cluster-head-hqcxt + agent-424238335.err dashboard_agent.log gcs_server.err monitor.err old raylet.out stdout.log + agent-424238335.out debug_state.txt gcs_server.out monitor.log ray_client_server.err runtime_env_agent.err + dashboard.err debug_state_gcs.txt log_monitor.err monitor.out ray_client_server.out runtime_env_agent.log + dashboard.log events log_monitor.log nsight raylet.err runtime_env_agent.out + ``` diff --git a/kubectl-plugin/pkg/cmd/log/log.go b/kubectl-plugin/pkg/cmd/log/log.go new file mode 100644 index 00000000000..0f09e41b075 --- /dev/null +++ b/kubectl-plugin/pkg/cmd/log/log.go @@ -0,0 +1,301 @@ +package log + +import ( + "archive/tar" + "bytes" + "context" + "errors" + "fmt" + "io" + "log" + "math" + "net/url" + "os" + "path" + "path/filepath" + + "github.com/spf13/cobra" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/cli-runtime/pkg/genericclioptions" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + cmdutil "k8s.io/kubectl/pkg/cmd/util" +) + +const filePathInPod = "/tmp/ray/session_latest/logs/" + +type ClusterLogOptions struct { + configFlags *genericclioptions.ConfigFlags + ioStreams *genericclioptions.IOStreams + Executor RemoteExecutor + outputDir string + nodeType string + args []string +} + +func NewClusterLogOptions(streams genericclioptions.IOStreams) *ClusterLogOptions { + return &ClusterLogOptions{ + configFlags: genericclioptions.NewConfigFlags(true), + ioStreams: &streams, + Executor: &DefaultRemoteExecutor{}, + } +} + +func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command { + options := NewClusterLogOptions(streams) + // Initialize the factory for later use with the current config flag + cmdFactory := cmdutil.NewFactory(options.configFlags) + + cmd := &cobra.Command{ + Use: "log (RAY_CLUSTER_NAME) [--out-dir DIR_PATH] [--node-type all|head|worker]", + Short: "Get ray cluster log", + Aliases: []string{"logs"}, + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + if err := options.Complete(args); err != nil { + return err + } + if err := options.Validate(); err != nil { + return err + } + return options.Run(cmd.Context(), cmdFactory) + }, + } + cmd.Flags().StringVar(&options.outputDir, "out-dir", options.outputDir, "File Directory PATH of where to download the file logs to.") + cmd.Flags().StringVar(&options.nodeType, "node-type", options.nodeType, "Type of Ray node to download the files for.") + options.configFlags.AddFlags(cmd.Flags()) + return cmd +} + +func (options *ClusterLogOptions) Complete(args []string) error { + options.args = args + + if options.nodeType == "" { + options.nodeType = "head" + } + + return nil +} + +func (options *ClusterLogOptions) Validate() error { + // Overrides and binds the kube config then retrieves the merged result + config, err := options.configFlags.ToRawKubeConfigLoader().RawConfig() + if err != nil { + return fmt.Errorf("Error retrieving raw config: %w", err) + } + if len(config.CurrentContext) == 0 { + return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context ") + } + + // Command must have ray cluster name + if len(options.args) != 1 { + return fmt.Errorf("must have at only one argument") + } else if options.outputDir == "" { + fmt.Fprintln(options.ioStreams.Out, "No output directory specified, creating dir under current directory using cluster name.") + options.outputDir = options.args[0] + err := os.MkdirAll(options.outputDir, 0o755) + if err != nil { + return fmt.Errorf("could not create directory with cluster name %s: %w", options.outputDir, err) + } + } + + switch options.nodeType { + case "all": + return fmt.Errorf("node type `all` is currently not supported") + case "head": + break + case "worker": + return fmt.Errorf("node type `worker` is currently not supported") + default: + return fmt.Errorf("unknown node type `%s`", options.nodeType) + } + + info, err := os.Stat(options.outputDir) + if os.IsNotExist(err) { + return fmt.Errorf("Directory does not exist. Failed with: %w", err) + } else if err != nil { + return fmt.Errorf("Error occurred will checking directory: %w", err) + } else if !info.IsDir() { + return fmt.Errorf("Path is Not a directory. Please input a directory and try again") + } + + return nil +} + +func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Factory) error { + kubeClientSet, err := factory.KubernetesClientSet() + if err != nil { + return fmt.Errorf("failed to retrieve kubernetes client set: %w", err) + } + + var listopts v1.ListOptions + if options.nodeType == "head" { + listopts = v1.ListOptions{ + LabelSelector: fmt.Sprintf("ray.io/group=headgroup, ray.io/cluster=%s", options.args[0]), + } + } + + // Get list of nodes that are considered ray heads + rayHeads, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts) + if err != nil { + return fmt.Errorf("failed to retrieve head node for cluster %s: %w", options.args[0], err) + } + + // Get a list of logs of the ray heads. + var logList []*bytes.Buffer + for _, rayHead := range rayHeads.Items { + request := kubeClientSet.CoreV1().Pods(rayHead.Namespace).GetLogs(rayHead.Name, &corev1.PodLogOptions{}) + + podLogs, err := request.Stream(ctx) + if err != nil { + return fmt.Errorf("Error retrieving log for kuberay-head %s: %w", rayHead.Name, err) + } + defer podLogs.Close() + + // Get current logs: + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err != nil { + return fmt.Errorf("Failed to get read current logs for kuberay-head %s: %w", rayHead.Name, err) + } + + logList = append(logList, buf) + } + + // Pod file name format is name of the ray head + for ind, logList := range logList { + curFilePath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name, "stdout.log") + dirPath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name) + err := os.MkdirAll(dirPath, 0o755) + if err != nil { + return fmt.Errorf("failed to create directory within path %s: %w", dirPath, err) + } + file, err := os.OpenFile(curFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) + if err != nil { + return fmt.Errorf("failed to create/open file for kuberay-head with path: %s: %w", curFilePath, err) + } + defer file.Close() + + _, err = logList.WriteTo(file) + if err != nil { + return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayHeads.Items[ind].Name, err) + } + + req := kubeClientSet.CoreV1().RESTClient(). + Get(). + Namespace(rayHeads.Items[ind].Namespace). + Resource("pods"). + Name(rayHeads.Items[ind].Name). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Command: []string{"tar", "--warning=no-file-changed", "-cf", "-", "-C", filePathInPod, "."}, + Stdin: true, + Stdout: true, + Stderr: true, + TTY: false, + }, clientgoscheme.ParameterCodec) + + restconfig, err := factory.ToRESTConfig() + if err != nil { + return fmt.Errorf("failed to get restconfig: %w", err) + } + + exec, err := options.Executor.CreateExecutor(restconfig, req.URL()) + if err != nil { + return fmt.Errorf("failed to create executor with error: %w", err) + } + + err = options.downloadRayLogFiles(ctx, exec, rayHeads.Items[ind]) + if err != nil { + return fmt.Errorf("failed to download ray head log files with error: %w", err) + } + } + return nil +} + +// RemoteExecutor creates the executor for executing exec on the pod - provided for testing purposes +type RemoteExecutor interface { + CreateExecutor(restConfig *rest.Config, url *url.URL) (remotecommand.Executor, error) +} + +type DefaultRemoteExecutor struct{} + +// CreateExecutor returns the executor created by NewSPDYExecutor +func (dre *DefaultRemoteExecutor) CreateExecutor(restConfig *rest.Config, url *url.URL) (remotecommand.Executor, error) { + return remotecommand.NewSPDYExecutor(restConfig, "POST", url) +} + +// downloadRayLogFiles will use to the executor and retrieve the logs file from the inputted ray head +func (options *ClusterLogOptions) downloadRayLogFiles(ctx context.Context, exec remotecommand.Executor, rayhead corev1.Pod) error { + outreader, outStream := io.Pipe() + go func() { + defer outStream.Close() + err := exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: options.ioStreams.In, + Stdout: outStream, + Stderr: options.ioStreams.ErrOut, + Tty: false, + }) + if err != nil { + log.Fatalf("Error occurred while calling remote command: %v", err) + } + }() + + // Goes through the tar and create/copy them one by one into the destination dir + tarReader := tar.NewReader(outreader) + header, err := tarReader.Next() + if err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("error will extracting head tar file for ray head %s: %w", rayhead.Name, err) + } + for !errors.Is(err, io.EOF) { + fmt.Printf("Downloading file %s for Ray Head %s\n", header.Name, rayhead.Name) + if err != nil { + return fmt.Errorf("Error reading tar archive: %w", err) + } + + // Construct the full local path and a directory for the tmp file logs + localFilePath := filepath.Join(path.Clean(options.outputDir), path.Clean(rayhead.Name), path.Clean(header.Name)) + + switch header.Typeflag { + case tar.TypeDir: + if err := os.MkdirAll(localFilePath, 0o755); err != nil { + return fmt.Errorf("Error creating directory: %w", err) + } + case tar.TypeReg: + // Check for overflow: G115 + if header.Mode < 0 || header.Mode > math.MaxUint32 { + fmt.Fprintf(options.ioStreams.Out, "file mode out side of accceptable value %d skipping file", header.Mode) + } + // Create file and write contents + outFile, err := os.OpenFile(localFilePath, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode)) //nolint:gosec // lint failing due to file mode conversion from uint64 to int32, checked above + if err != nil { + return fmt.Errorf("Error creating file: %w", err) + } + defer outFile.Close() + // This is to limit the copy size for a decompression bomb, currently set arbitrarily + for { + n, err := io.CopyN(outFile, tarReader, 1000000) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return fmt.Errorf("failed while writing to file: %w", err) + } + if n == 0 { + break + } + } + default: + fmt.Printf("Ignoring unsupported file type: %b", header.Typeflag) + } + + header, err = tarReader.Next() + if header == nil && err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("error while extracting tar file with error: %w", err) + } + } + + return nil +} diff --git a/kubectl-plugin/pkg/cmd/log/log_test.go b/kubectl-plugin/pkg/cmd/log/log_test.go new file mode 100644 index 00000000000..c5c38c58f6b --- /dev/null +++ b/kubectl-plugin/pkg/cmd/log/log_test.go @@ -0,0 +1,504 @@ +package log + +import ( + "archive/tar" + "bytes" + "context" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/cli-runtime/pkg/genericiooptions" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/rest" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/rest/fake" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/tools/remotecommand" + cmdtesting "k8s.io/kubectl/pkg/cmd/testing" + "k8s.io/kubectl/pkg/scheme" +) + +// Mocked NewSPDYExecutor +var fakeNewSPDYExecutor = func(method string, url *url.URL, inputbuf *bytes.Buffer) (remotecommand.Executor, error) { + return &fakeExecutor{method: method, url: url, buf: inputbuf}, nil +} + +type fakeExecutor struct { + url *url.URL + buf *bytes.Buffer + method string +} + +// Stream is needed for implementing remotecommand.Execute +func (f *fakeExecutor) Stream(_ remotecommand.StreamOptions) error { + return nil +} + +// downloadRayLogFiles uses StreamWithContext so this is the real function that we are mocking +func (f *fakeExecutor) StreamWithContext(_ context.Context, options remotecommand.StreamOptions) error { + _, err := io.Copy(options.Stdout, f.buf) + return err +} + +// createFakeTarFile creates the fake tar file that will be used for testing +func createFakeTarFile() (*bytes.Buffer, error) { + // Create a buffer to hold the tar archive + tarbuff := new(bytes.Buffer) + + // Create a tar writer + tw := tar.NewWriter(tarbuff) + + // Define the files/directories to include + files := []struct { + ModTime time.Time + Name string + Body string + IsDir bool + Mode int64 + }{ + {time.Now(), "/", "", true, 0o755}, + {time.Now(), "file1.txt", "This is the content of file1.txt\n", false, 0o644}, + {time.Now(), "file2.txt", "Content of file2.txt inside subdir\n", false, 0o644}, + } + + // Add each file/directory to the tar archive + for _, file := range files { + hdr := &tar.Header{ + Name: file.Name, + Mode: file.Mode, + ModTime: file.ModTime, + Size: int64(len(file.Body)), + } + if file.IsDir { + hdr.Typeflag = tar.TypeDir + } else { + hdr.Typeflag = tar.TypeReg + } + + // Write the header + if err := tw.WriteHeader(hdr); err != nil { + return nil, err + } + + // Write the file content (if not a directory) + if !file.IsDir { + if _, err := tw.Write([]byte(file.Body)); err != nil { + return nil, err + } + } + } + + // Close the tar writer + if err := tw.Close(); err != nil { + return nil, err + } + return tarbuff, nil +} + +type FakeRemoteExecutor struct{} + +func (dre *FakeRemoteExecutor) CreateExecutor(_ *rest.Config, url *url.URL) (remotecommand.Executor, error) { + return fakeNewSPDYExecutor("GET", url, new(bytes.Buffer)) +} + +func TestRayClusterLogComplete(t *testing.T) { + testStreams, _, _, _ := genericclioptions.NewTestIOStreams() + fakeClusterLogOptions := NewClusterLogOptions(testStreams) + fakeArgs := []string{"Expectedoutput"} + + err := fakeClusterLogOptions.Complete(fakeArgs) + + assert.Equal(t, fakeClusterLogOptions.nodeType, "head") + assert.Nil(t, err) + assert.Equal(t, fakeClusterLogOptions.args, fakeArgs) +} + +func TestRayClusterLogValidate(t *testing.T) { + testStreams, _, _, _ := genericclioptions.NewTestIOStreams() + + testNS, testContext, testBT, testImpersonate := "test-namespace", "test-contet", "test-bearer-token", "test-person" + + // Fake directory for kubeconfig + fakeDir, err := os.MkdirTemp("", "fake-config") + assert.Nil(t, err) + defer os.RemoveAll(fakeDir) + + // Set up fake config for kubeconfig + config := &api.Config{ + Clusters: map[string]*api.Cluster{ + "test-cluster": { + Server: "https://fake-kubernetes-cluster.example.com", + InsecureSkipTLSVerify: true, // For testing purposes + }, + }, + Contexts: map[string]*api.Context{ + "my-fake-context": { + Cluster: "my-fake-cluster", + AuthInfo: "my-fake-user", + }, + }, + CurrentContext: "my-fake-context", + AuthInfos: map[string]*api.AuthInfo{ + "my-fake-user": { + Token: "", // Empty for testing without authentication + }, + }, + } + + fakeFile := filepath.Join(fakeDir, ".kubeconfig") + + if err := clientcmd.WriteToFile(*config, fakeFile); err != nil { + t.Fatalf("Failed to write kubeconfig to temp file: %v", err) + } + + // Initialize the fake config flag with the fake kubeconfig and values + fakeConfigFlags := &genericclioptions.ConfigFlags{ + Namespace: &testNS, + Context: &testContext, + KubeConfig: &fakeFile, + BearerToken: &testBT, + Impersonate: &testImpersonate, + ImpersonateGroup: &[]string{"fake-group"}, + } + + tests := []struct { + name string + opts *ClusterLogOptions + expect string + expectError string + }{ + { + name: "Test validation when no context is set", + opts: &ClusterLogOptions{ + configFlags: genericclioptions.NewConfigFlags(false), + outputDir: fakeDir, + args: []string{"fake-cluster"}, + nodeType: "head", + ioStreams: &testStreams, + }, + expectError: "no context is currently set, use \"kubectl config use-context \" to select a new one", + }, + { + name: "Test validation when more than 1 arg", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: fakeDir, + args: []string{"fake-cluster", "another-fake"}, + nodeType: "head", + ioStreams: &testStreams, + }, + expectError: "must have at only one argument", + }, + { + name: "Test validation when node type is `all`", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: fakeDir, + args: []string{"fake-cluster"}, + nodeType: "all", + ioStreams: &testStreams, + }, + expectError: "node type `all` is currently not supported", + }, + { + name: "Test validation when node type is `worker`", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: fakeDir, + args: []string{"fake-cluster"}, + nodeType: "worker", + ioStreams: &testStreams, + }, + expectError: "node type `worker` is currently not supported", + }, + { + name: "Test validation when node type is `random-string`", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: fakeDir, + args: []string{"fake-cluster"}, + nodeType: "random-string", + ioStreams: &testStreams, + }, + expectError: "unknown node type `random-string`", + }, + { + name: "Successful validation call", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: fakeDir, + args: []string{"random_arg"}, + nodeType: "head", + ioStreams: &testStreams, + }, + expectError: "", + }, + { + name: "Validate output directory when no out-dir i set.", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: "", + args: []string{"cluster-name"}, + nodeType: "head", + ioStreams: &testStreams, + }, + expectError: "", + }, + { + name: "Failed validation call with output directory not exist", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: "randomPath-here", + args: []string{"random_arg"}, + nodeType: "head", + ioStreams: &testStreams, + }, + expectError: "Directory does not exist. Failed with: stat randomPath-here: no such file or directory", + }, + { + name: "Failed validation call with output directory is file", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: fakeFile, + args: []string{"random_arg"}, + nodeType: "head", + ioStreams: &testStreams, + }, + expectError: "Path is Not a directory. Please input a directory and try again", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.opts.Validate() + if tc.expectError != "" { + assert.Equal(t, tc.expectError, err.Error()) + } else { + if tc.opts.outputDir == "" { + assert.Equal(t, tc.opts.args[0], tc.opts.outputDir) + } + assert.True(t, err == nil) + } + }) + } +} + +func TestRayClusterLogRun(t *testing.T) { + tf := cmdtesting.NewTestFactory().WithNamespace("test") + defer tf.Cleanup() + + fakeDir, err := os.MkdirTemp("", "fake-directory") + assert.Nil(t, err) + defer os.RemoveAll(fakeDir) + + testStreams, _, _, _ := genericiooptions.NewTestIOStreams() + + fakeClusterLogOptions := NewClusterLogOptions(testStreams) + // Uses the mocked executor + fakeClusterLogOptions.Executor = &FakeRemoteExecutor{} + fakeClusterLogOptions.args = []string{"test-cluster"} + fakeClusterLogOptions.outputDir = fakeDir + + // Create list of fake ray heads + rayHeadsList := &v1.PodList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "15", + }, + Items: []v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-kuberay-head-1", + Namespace: "test", + Labels: map[string]string{ + "ray.io/group": "headgroup", + "ray.io/clusters": "test-cluster", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "mycontainer", + Image: "nginx:latest", + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "10.0.0.1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-kuberay-head-2", + Namespace: "test", + Labels: map[string]string{ + "ray.io/group": "headgroup", + "ray.io/clusters": "test-cluster", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "anothercontainer", + Image: "busybox:latest", + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + }, + } + + // create logs for multiple head pods and turn them into io streams so they can be returned with the fake client + fakeLogs := []string{ + "This is some fake log data for first pod.\nStill first pod logs\n", + "This is some fake log data for second pod.\nStill second pod logs\n", + } + logReader1 := io.NopCloser(bytes.NewReader([]byte(fakeLogs[0]))) + logReader2 := io.NopCloser(bytes.NewReader([]byte(fakeLogs[1]))) + + // fakes the client and the REST calls. + codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...) + tf.Client = &fake.RESTClient{ + GroupVersion: v1.SchemeGroupVersion, + NegotiatedSerializer: resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + switch req.URL.Path { + case "/api/v1/pods": + return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: cmdtesting.ObjBody(codec, rayHeadsList)}, nil + case "/api/v1/namespaces/test/pods/test-cluster-kuberay-head-1/log": + return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: logReader1}, nil + case "/api/v1/namespaces/test/pods/test-cluster-kuberay-head-2/log": + return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: logReader2}, nil + default: + t.Fatalf("request url: %#v,and request: %#v", req.URL, req) + return nil, nil + } + }), + } + + tf.ClientConfigVal = &restclient.Config{ + ContentConfig: restclient.ContentConfig{GroupVersion: &v1.SchemeGroupVersion}, + } + + err = fakeClusterLogOptions.Run(context.Background(), tf) + assert.Nil(t, err) + + // Check that the two directories are there + entries, err := os.ReadDir(fakeDir) + assert.Nil(t, err) + assert.Equal(t, 2, len(entries)) + + assert.Equal(t, "test-cluster-kuberay-head-1", entries[0].Name()) + assert.Equal(t, "test-cluster-kuberay-head-2", entries[1].Name()) + + // Check the first directory for the logs + for ind, entry := range entries { + currPath := filepath.Join(fakeDir, entry.Name()) + currDir, err := os.ReadDir(currPath) + assert.Nil(t, err) + assert.Equal(t, 1, len(currDir)) + openfile, err := os.Open(filepath.Join(currPath, "stdout.log")) + assert.Nil(t, err) + actualContent, err := io.ReadAll(openfile) + assert.Nil(t, err) + assert.Equal(t, fakeLogs[ind], string(actualContent)) + } +} + +func TestDownloadRayLogFiles(t *testing.T) { + fakeDir, err := os.MkdirTemp("", "fake-directory") + assert.Nil(t, err) + defer os.RemoveAll(fakeDir) + + testStreams, _, _, _ := genericiooptions.NewTestIOStreams() + + fakeClusterLogOptions := NewClusterLogOptions(testStreams) + fakeClusterLogOptions.args = []string{"test-cluster"} + fakeClusterLogOptions.outputDir = fakeDir + + // create fake tar files to test + fakeTar, err := createFakeTarFile() + assert.Nil(t, err) + + // Ray head needed for calling the downloadRayLogFiles command + rayHead := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-kuberay-head-1", + Namespace: "test", + Labels: map[string]string{ + "ray.io/group": "headgroup", + "ray.io/clusters": "test-cluster", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "mycontainer", + Image: "nginx:latest", + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "10.0.0.1", + }, + } + + executor, _ := fakeNewSPDYExecutor("GET", &url.URL{}, fakeTar) + + err = fakeClusterLogOptions.downloadRayLogFiles(context.Background(), executor, rayHead) + assert.Nil(t, err) + + entries, err := os.ReadDir(fakeDir) + assert.Nil(t, err) + assert.Equal(t, 1, len(entries)) + + // Assert the files + assert.True(t, entries[0].IsDir()) + files, err := os.ReadDir(filepath.Join(fakeDir, entries[0].Name())) + assert.Nil(t, err) + assert.Equal(t, 2, len(files)) + + expectedfileoutput := []struct { + Name string + Body string + }{ + {"file1.txt", "This is the content of file1.txt\n"}, + {"file2.txt", "Content of file2.txt inside subdir\n"}, + } + + // Goes through and check the temp directory with the downloaded files + for ind, file := range files { + fileInfo, err := file.Info() + assert.Nil(t, err) + curr := expectedfileoutput[ind] + + assert.Equal(t, curr.Name, fileInfo.Name()) + openfile, err := os.Open(filepath.Join(fakeDir, entries[0].Name(), file.Name())) + assert.Nil(t, err) + actualContent, err := io.ReadAll(openfile) + assert.Nil(t, err) + assert.Equal(t, curr.Body, string(actualContent)) + } +} diff --git a/kubectl-plugin/pkg/cmd/ray.go b/kubectl-plugin/pkg/cmd/ray.go index 100f57e519b..59b4d616221 100644 --- a/kubectl-plugin/pkg/cmd/ray.go +++ b/kubectl-plugin/pkg/cmd/ray.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/cobra" "github.com/ray-project/kuberay/kubectl-plugin/pkg/cmd/cluster" + "github.com/ray-project/kuberay/kubectl-plugin/pkg/cmd/log" "github.com/ray-project/kuberay/kubectl-plugin/pkg/cmd/session" ) @@ -22,5 +23,6 @@ func NewRayCommand(streams genericiooptions.IOStreams) *cobra.Command { cmd.AddCommand(cluster.NewClusterCommand(streams)) cmd.AddCommand(session.NewSessionCommand(streams)) + cmd.AddCommand(log.NewClusterLogCommand(streams)) return cmd }