From 6e73ff4a168baaa3c61d5c02da1b91923d7b737f Mon Sep 17 00:00:00 2001 From: Aaron Liang Date: Fri, 9 Aug 2024 10:13:01 -0700 Subject: [PATCH] Add kubectl ray cluster log command --- kubectl-plugin/README.md | 31 +++ kubectl-plugin/pkg/cmd/cluster/cluster.go | 1 + kubectl-plugin/pkg/cmd/cluster/log.go | 236 ++++++++++++++++++ kubectl-plugin/pkg/cmd/cluster/log_test.go | 273 +++++++++++++++++++++ 4 files changed, 541 insertions(+) create mode 100644 kubectl-plugin/pkg/cmd/cluster/log.go create mode 100644 kubectl-plugin/pkg/cmd/cluster/log_test.go diff --git a/kubectl-plugin/README.md b/kubectl-plugin/README.md index ca606d3db3..a7a7404ead 100644 --- a/kubectl-plugin/README.md +++ b/kubectl-plugin/README.md @@ -44,3 +44,34 @@ Kubectl plugin/extension for Kuberay CLI that provides the ability to manage ray --tls-server-name string Server name to use for server certificate validation. If it is not provided, the hostname used to contact the server is used --token string Bearer token for authentication to the API server --user string The name of the kubeconfig user to use + +### Retrieve Ray Cluster Head Logs + + Usage: + ray cluster log (CLUSTER_NAME) [flags] + + Aliases: + log, logs + + Flags: + --as string Username to impersonate for the operation. User could be a regular user or a service account in a namespace. + --as-group stringArray Group to impersonate for the operation, this flag can be repeated to specify multiple groups. + --as-uid string UID to impersonate for the operation. + --cache-dir string Default cache directory (default "/Users/aaronliang/.kube/cache") + --certificate-authority string Path to a cert file for the certificate authority + --client-certificate string Path to a client certificate file for TLS + --client-key string Path to a client key file for TLS + --cluster string The name of the kubeconfig cluster to use + --context string The name of the kubeconfig context to use + --disable-compression If true, opt-out of response compression for all requests to the server + -h, --help help for log + --insecure-skip-tls-verify If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure + --kubeconfig string Path to the kubeconfig file to use for CLI requests. + -n, --namespace string If present, the namespace scope for this CLI request + --out-dir string File Directory PATH of where to download the file logs to. + --request-timeout string The length of time to wait before giving up on a single server request. Non-zero values should contain a corresponding time unit (e.g. 1s, 2m, 3h). A value of zero means don't timeout requests. (default "0") + -s, --server string The address and port of the Kubernetes API server + --tls-server-name string Server name to use for server certificate validation. If it is not provided, the hostname used to contact the server is used + --token string Bearer token for authentication to the API server + --user string The name of the kubeconfig user to use + aaronliang-macbookpro:kubectl-plugin aaronliang$ kubectl ray cluster log raycluster-sample diff --git a/kubectl-plugin/pkg/cmd/cluster/cluster.go b/kubectl-plugin/pkg/cmd/cluster/cluster.go index 264d80abe1..43d2f3aa53 100644 --- a/kubectl-plugin/pkg/cmd/cluster/cluster.go +++ b/kubectl-plugin/pkg/cmd/cluster/cluster.go @@ -23,5 +23,6 @@ func NewClusterCommand(streams genericclioptions.IOStreams) *cobra.Command { } cmd.AddCommand(NewClusterGetCommand(streams)) + cmd.AddCommand(NewClusterLogCommand(streams)) return cmd } diff --git a/kubectl-plugin/pkg/cmd/cluster/log.go b/kubectl-plugin/pkg/cmd/cluster/log.go new file mode 100644 index 0000000000..dc85edca3f --- /dev/null +++ b/kubectl-plugin/pkg/cmd/cluster/log.go @@ -0,0 +1,236 @@ +package cluster + +import ( + "archive/tar" + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "path" + "path/filepath" + + "github.com/spf13/cobra" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/cli-runtime/pkg/genericclioptions" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/remotecommand" + cmdutil "k8s.io/kubectl/pkg/cmd/util" +) + +type ClusterLogOptions struct { + configFlags *genericclioptions.ConfigFlags + ioStreams *genericclioptions.IOStreams + outputDir string + args []string +} + +func NewClusterLogOptions(streams genericclioptions.IOStreams) *ClusterLogOptions { + return &ClusterLogOptions{ + configFlags: genericclioptions.NewConfigFlags(true), + ioStreams: &streams, + } +} + +func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command { + options := NewClusterLogOptions(streams) + // Initialize the factory for later use with the current config flag + cmdFactory := cmdutil.NewFactory(options.configFlags) + + cmd := &cobra.Command{ + Use: "log (RAY_CLUSTER_NAME) --out-dir [directory]", + Short: "Get cluster log", + Aliases: []string{"logs"}, + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + if err := options.Complete(args); err != nil { + return err + } + if err := options.Validate(); err != nil { + return err + } + return options.Run(cmd.Context(), cmdFactory) + }, + } + cmd.Flags().StringVar(&options.outputDir, "out-dir", options.outputDir, "File Directory PATH of where to download the file logs to.") + options.configFlags.AddFlags(cmd.Flags()) + return cmd +} + +func (options *ClusterLogOptions) Complete(args []string) error { + options.args = args + return nil +} + +func (options *ClusterLogOptions) Validate() error { + // Overrides and binds the kube config then retrieves the merged result + config, err := options.configFlags.ToRawKubeConfigLoader().RawConfig() + if err != nil { + return fmt.Errorf("Error retrieving raw config: %w", err) + } + if len(config.CurrentContext) == 0 { + return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context ") + } + // we validate the directory + info, err := os.Stat(options.outputDir) + if os.IsNotExist(err) { + return fmt.Errorf("Directory does not exist. Failed with %w", err) + } else if err != nil { + return fmt.Errorf("Error occurred will checking directory %w", err) + } else if !info.IsDir() { + return fmt.Errorf("Path is Not a directory. Please input a directory and try again") + } + + // Command must have ray cluster name + if len(options.args) > 1 { + return fmt.Errorf("must have at most one argument") + } + return nil +} + +func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Factory) error { + kubeClientSet, err := factory.KubernetesClientSet() + if err != nil { + return fmt.Errorf("failed to retrieve kubernetes client set: %w", err) + } + + var listopts v1.ListOptions + if len(options.args) == 1 { + listopts = v1.ListOptions{ + LabelSelector: fmt.Sprintf("ray.io/group=headgroup, ray.io/cluster=%s", options.args[0]), + } + } else { + listopts = v1.ListOptions{ + LabelSelector: "ray.io/group=headgroup", + } + } + + // Get list of nodes that are considered ray heads + rayHeads, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts) + if err != nil { + return fmt.Errorf("failed to retrieve head node for cluster %s: %w", options.args[0], err) + } + + // get a list of logs of the ray heads. + var logList []*bytes.Buffer + for _, rayHead := range rayHeads.Items { + request := kubeClientSet.CoreV1().Pods(rayHead.Namespace).GetLogs(rayHead.Name, &corev1.PodLogOptions{}) + + podLogs, err := request.Stream(ctx) + if err != nil { + return fmt.Errorf("Error retrieving log for kuberay-head %s: %w", rayHead.Name, err) + } + defer podLogs.Close() + + // Get current logs: + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err != nil { + return fmt.Errorf("Failed to get read current logs for kuberay-head %s: %w", rayHead.Name, err) + } + + logList = append(logList, buf) + } + + // Pod file name format is name of the ray head + for ind, logList := range logList { + curFilePath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name) + file, err := os.OpenFile(curFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) + if err != nil { + return fmt.Errorf("failed to create/open file for kuberay-head with path: %s: %w", rayHeads.Items[ind].Name, err) + } + defer file.Close() + + _, err = logList.WriteTo(file) + if err != nil { + return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayHeads.Items[ind].Name, err) + } + + // Copy over the rest of the files under /tmp/ray/session_latest/logs + filePathInPod := "/tmp/ray/session_latest/logs/" + + req := kubeClientSet.CoreV1().RESTClient(). + Get(). + Namespace(rayHeads.Items[ind].Namespace). + Resource("pods"). + Name(rayHeads.Items[ind].Name). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Command: []string{"tar", "--warning=no-file-changed", "-cf", "-", "-C", filePathInPod, "."}, + Stdin: true, + Stdout: true, + Stderr: true, + TTY: false, + }, clientgoscheme.ParameterCodec) + + // Execute the command + restconfig, err := factory.ToRESTConfig() + if err != nil { + return fmt.Errorf("failed to get restconfig: %w", err) + } + + exec, err := remotecommand.NewSPDYExecutor(restconfig, "POST", req.URL()) + if err != nil { + return fmt.Errorf("failed to call request with error: %w", err) + } + + // Create synchronous writer/reader for downloading the files + outreader, outStream := io.Pipe() + go func() { + defer outStream.Close() + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: options.ioStreams.In, + Stdout: outStream, + Stderr: options.ioStreams.ErrOut, + Tty: false, + }) + }() + + // Goes through the tar and create/copy them one by one into the destination dir + tarReader := tar.NewReader(outreader) + header, err := tarReader.Next() + if err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("error will extracting tar file %w", err) + } + for !errors.Is(err, io.EOF) { + fmt.Printf("Downloading file %s for Ray Head %s\n", header.Name, rayHeads.Items[ind].Name) + if err != nil { + return fmt.Errorf("Error reading tar archive: %w", err) + } + + // Construct the full local path and a directory for the tmp file logs + logfile := rayHeads.Items[ind].Name + "-logs" + localFilePath := filepath.Join(path.Clean(options.outputDir), path.Clean(logfile), path.Clean(header.Name)) + + switch header.Typeflag { + case tar.TypeDir: + // Create directory + if err := os.MkdirAll(localFilePath, 0o755); err != nil { + return fmt.Errorf("Error creating directory: %w", err) + } + case tar.TypeReg: + // Create file and write contents + outFile, err := os.OpenFile(localFilePath, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode)) + if err != nil { + return fmt.Errorf("Error creating file: %w", err) + } + defer outFile.Close() + // This is to limit the copy size for a decompression bomb, currently set arbitrarily + _, err = io.CopyN(outFile, tarReader, 1000000) + + if err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("failed while writing to file: %w", err) + } + default: + fmt.Printf("Ignoring unsupported file type: %b", header.Typeflag) + } + header, err = tarReader.Next() + if err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("error will extracting tar file %w", err) + } + } + } + return nil +} diff --git a/kubectl-plugin/pkg/cmd/cluster/log_test.go b/kubectl-plugin/pkg/cmd/cluster/log_test.go new file mode 100644 index 0000000000..9348f46d78 --- /dev/null +++ b/kubectl-plugin/pkg/cmd/cluster/log_test.go @@ -0,0 +1,273 @@ +package cluster + +import ( + "bytes" + "context" + "io" + "net/http" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/cli-runtime/pkg/resource" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/rest/fake" + restfake "k8s.io/client-go/rest/fake" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/clientcmd/api" + cmdtesting "k8s.io/kubectl/pkg/cmd/testing" + "k8s.io/kubectl/pkg/scheme" +) + +func TestRayClusterLogComplete(t *testing.T) { + testStreams, _, _, _ := genericclioptions.NewTestIOStreams() + fakeClusterLogOptions := NewClusterLogOptions(testStreams) + fakeArgs := []string{"Expected output"} + + fakeClusterLogOptions.outputDir = "Second Expected Output" + + err := fakeClusterLogOptions.Complete(fakeArgs) + if err != nil { + t.Fatalf("Error calling Complete(): %v", err) + } + + assert.Equal(t, fakeClusterLogOptions.args, fakeArgs) +} + +func TestRayClusterLogValidate(t *testing.T) { + testStreams, _, _, _ := genericclioptions.NewTestIOStreams() + + testNS, testContext, testBT, testImpersonate := "test-namespace", "test-contet", "test-bearer-token", "test-person" + + // Fake directory for kubeconfig + fakeDir, err := os.MkdirTemp("", "fake-config") + if err != nil { + t.Fatalf("Error setting up make kubeconfig: %v", err) + } + defer os.RemoveAll(fakeDir) + + // Set up fake config for kubeconfig + config := &api.Config{ + Clusters: map[string]*api.Cluster{ + "test-cluster": { + Server: "https://fake-kubernetes-cluster.example.com", + InsecureSkipTLSVerify: true, // For testing purposes + }, + }, + Contexts: map[string]*api.Context{ + "my-fake-context": { + Cluster: "my-fake-cluster", + AuthInfo: "my-fake-user", + }, + }, + CurrentContext: "my-fake-context", + AuthInfos: map[string]*api.AuthInfo{ + "my-fake-user": { + Token: "", // Empty for testing without authentication + }, + }, + } + + fakeFile := filepath.Join(fakeDir, ".kubeconfig") + + if err := clientcmd.WriteToFile(*config, fakeFile); err != nil { + t.Fatalf("Failed to write kubeconfig to temp file: %v", err) + } + + // Initialize the fake config flag with the fake kubeconfig and values + fakeConfigFlags := &genericclioptions.ConfigFlags{ + Namespace: &testNS, + Context: &testContext, + KubeConfig: &fakeFile, + BearerToken: &testBT, + Impersonate: &testImpersonate, + ImpersonateGroup: &[]string{"fake-group"}, + } + + tests := []struct { + name string + opts *ClusterLogOptions + expect string + expectError string + }{ + { + name: "Test validation when no context is set", + opts: &ClusterLogOptions{ + configFlags: genericclioptions.NewConfigFlags(false), + outputDir: "", + args: []string{"fake-cluster"}, + ioStreams: &testStreams, + }, + expectError: "no context is currently set, use \"kubectl config use-context \" to select a new one", + }, + { + name: "Test validation when more than 1 arg", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: "", + args: []string{"fake-cluster", "another-fake"}, + ioStreams: &testStreams, + }, + expectError: "must have at most one argument", + }, + { + name: "Successful validation call", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: "", + args: []string{"random_arg"}, + ioStreams: &testStreams, + }, + expectError: "", + }, + { + name: "Successfule validation call with output directory", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: fakeDir, + args: []string{"random_arg"}, + ioStreams: &testStreams, + }, + expectError: "", + }, + { + name: "Failed validation call with output directory not exist", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: "randomPath-here", + args: []string{"random_arg"}, + ioStreams: &testStreams, + }, + expectError: "Directory does not exist. Failed with stat randomPath-here: no such file or directory", + }, + { + name: "Failed validation call with output directory is file", + opts: &ClusterLogOptions{ + // Use fake config to bypass the config flag checks + configFlags: fakeConfigFlags, + outputDir: fakeFile, + args: []string{"random_arg"}, + ioStreams: &testStreams, + }, + expectError: "Path is Not a directory. Please input a directory and try again", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.opts.Validate() + if tc.expectError != "" { + assert.Equal(t, tc.expectError, err.Error()) + } else { + assert.True(t, err == nil) + } + }) + } +} + +func TestRayClusterLogRun(t *testing.T) { + tf := cmdtesting.NewTestFactory().WithNamespace("test") + defer tf.Cleanup() + + testStreams, _, resBuf, _ := genericclioptions.NewTestIOStreams() + + fakeClusterLogOptions := NewClusterLogOptions(testStreams) + fakeClusterLogOptions.args = []string{"test-cluster"} + + // Create list of fake ray heads + rayHeadsList := &v1.PodList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "15", + }, + Items: []v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-kuberay-head-1", + Namespace: "test", + Labels: map[string]string{ + "ray.io/group": "headgroup", + "ray.io/clusters": "test-cluster", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "mycontainer", + Image: "nginx:latest", + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "10.0.0.1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-kuberay-head-2", + Namespace: "test", + Labels: map[string]string{ + "ray.io/group": "headgroup", + "ray.io/clusters": "test-cluster", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "anothercontainer", + Image: "busybox:latest", + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + }, + } + + // create logs for multiple head pods and turn them into io streams so they can be returned with the fake client + fakeLogsHead1 := "This is some fake log data for first pod.\nStill first pod logs\n" + fakeLogsHead2 := "This is some fake log data for second pod.\nStill second pod logs\n" + logReader1 := io.NopCloser(bytes.NewReader([]byte(fakeLogsHead1))) + logReader2 := io.NopCloser(bytes.NewReader([]byte(fakeLogsHead2))) + + // fakes the client and the REST calls. + codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...) + tf.Client = &restfake.RESTClient{ + GroupVersion: v1.SchemeGroupVersion, + NegotiatedSerializer: resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + switch req.URL.Path { + case "/api/v1/pods": + return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: cmdtesting.ObjBody(codec, rayHeadsList)}, nil + case "/api/v1/namespaces/test/pods/test-cluster-kuberay-head-1/log": + return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: logReader1}, nil + case "/api/v1/namespaces/test/pods/test-cluster-kuberay-head-2/log": + return &http.Response{StatusCode: http.StatusOK, Header: cmdtesting.DefaultHeader(), Body: logReader2}, nil + default: + t.Fatalf("request url: %#v,and request: %#v", req.URL, req) + return nil, nil + } + }), + } + tf.ClientConfigVal = &restclient.Config{ContentConfig: restclient.ContentConfig{GroupVersion: &v1.SchemeGroupVersion}} + + err := fakeClusterLogOptions.Run(context.Background(), tf) + if err != nil { + t.Fatalf("Error calling Run(): %v", err) + } + + expectedResult := "Head Name: test-cluster-kuberay-head-1\n" + fakeLogsHead1 + "Head Name: test-cluster-kuberay-head-2\n" + fakeLogsHead2 + if e, a := expectedResult, resBuf.String(); e != a { + t.Errorf("\nexpected\n%v\ngot\n%v", e, a) + } +}