-
Notifications
You must be signed in to change notification settings - Fork 419
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
541 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,236 @@ | ||
package cluster | ||
|
||
import ( | ||
"archive/tar" | ||
"bytes" | ||
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"os" | ||
"path" | ||
"path/filepath" | ||
|
||
"github.com/spf13/cobra" | ||
corev1 "k8s.io/api/core/v1" | ||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/cli-runtime/pkg/genericclioptions" | ||
clientgoscheme "k8s.io/client-go/kubernetes/scheme" | ||
"k8s.io/client-go/tools/remotecommand" | ||
cmdutil "k8s.io/kubectl/pkg/cmd/util" | ||
) | ||
|
||
type ClusterLogOptions struct { | ||
configFlags *genericclioptions.ConfigFlags | ||
ioStreams *genericclioptions.IOStreams | ||
outputDir string | ||
args []string | ||
} | ||
|
||
func NewClusterLogOptions(streams genericclioptions.IOStreams) *ClusterLogOptions { | ||
return &ClusterLogOptions{ | ||
configFlags: genericclioptions.NewConfigFlags(true), | ||
ioStreams: &streams, | ||
} | ||
} | ||
|
||
func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command { | ||
options := NewClusterLogOptions(streams) | ||
// Initialize the factory for later use with the current config flag | ||
cmdFactory := cmdutil.NewFactory(options.configFlags) | ||
|
||
cmd := &cobra.Command{ | ||
Use: "log (RAY_CLUSTER_NAME) --out-dir [directory]", | ||
Short: "Get cluster log", | ||
Aliases: []string{"logs"}, | ||
SilenceUsage: true, | ||
RunE: func(cmd *cobra.Command, args []string) error { | ||
if err := options.Complete(args); err != nil { | ||
return err | ||
} | ||
if err := options.Validate(); err != nil { | ||
return err | ||
} | ||
return options.Run(cmd.Context(), cmdFactory) | ||
}, | ||
} | ||
cmd.Flags().StringVar(&options.outputDir, "out-dir", options.outputDir, "File Directory PATH of where to download the file logs to.") | ||
options.configFlags.AddFlags(cmd.Flags()) | ||
return cmd | ||
} | ||
|
||
func (options *ClusterLogOptions) Complete(args []string) error { | ||
options.args = args | ||
return nil | ||
} | ||
|
||
func (options *ClusterLogOptions) Validate() error { | ||
// Overrides and binds the kube config then retrieves the merged result | ||
config, err := options.configFlags.ToRawKubeConfigLoader().RawConfig() | ||
if err != nil { | ||
return fmt.Errorf("Error retrieving raw config: %w", err) | ||
} | ||
if len(config.CurrentContext) == 0 { | ||
return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context <context>") | ||
} | ||
// we validate the directory | ||
info, err := os.Stat(options.outputDir) | ||
if os.IsNotExist(err) { | ||
return fmt.Errorf("Directory does not exist. Failed with %w", err) | ||
} else if err != nil { | ||
return fmt.Errorf("Error occurred will checking directory %w", err) | ||
} else if !info.IsDir() { | ||
return fmt.Errorf("Path is Not a directory. Please input a directory and try again") | ||
} | ||
|
||
// Command must have ray cluster name | ||
if len(options.args) > 1 { | ||
return fmt.Errorf("must have at most one argument") | ||
} | ||
return nil | ||
} | ||
|
||
func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Factory) error { | ||
kubeClientSet, err := factory.KubernetesClientSet() | ||
if err != nil { | ||
return fmt.Errorf("failed to retrieve kubernetes client set: %w", err) | ||
} | ||
|
||
var listopts v1.ListOptions | ||
if len(options.args) == 1 { | ||
listopts = v1.ListOptions{ | ||
LabelSelector: fmt.Sprintf("ray.io/group=headgroup, ray.io/cluster=%s", options.args[0]), | ||
} | ||
} else { | ||
listopts = v1.ListOptions{ | ||
LabelSelector: "ray.io/group=headgroup", | ||
} | ||
} | ||
|
||
// Get list of nodes that are considered ray heads | ||
rayHeads, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts) | ||
if err != nil { | ||
return fmt.Errorf("failed to retrieve head node for cluster %s: %w", options.args[0], err) | ||
} | ||
|
||
// get a list of logs of the ray heads. | ||
var logList []*bytes.Buffer | ||
for _, rayHead := range rayHeads.Items { | ||
request := kubeClientSet.CoreV1().Pods(rayHead.Namespace).GetLogs(rayHead.Name, &corev1.PodLogOptions{}) | ||
|
||
podLogs, err := request.Stream(ctx) | ||
if err != nil { | ||
return fmt.Errorf("Error retrieving log for kuberay-head %s: %w", rayHead.Name, err) | ||
} | ||
defer podLogs.Close() | ||
|
||
// Get current logs: | ||
buf := new(bytes.Buffer) | ||
_, err = io.Copy(buf, podLogs) | ||
if err != nil { | ||
return fmt.Errorf("Failed to get read current logs for kuberay-head %s: %w", rayHead.Name, err) | ||
} | ||
|
||
logList = append(logList, buf) | ||
} | ||
|
||
// Pod file name format is name of the ray head | ||
for ind, logList := range logList { | ||
curFilePath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name) | ||
file, err := os.OpenFile(curFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) | ||
if err != nil { | ||
return fmt.Errorf("failed to create/open file for kuberay-head with path: %s: %w", rayHeads.Items[ind].Name, err) | ||
} | ||
defer file.Close() | ||
|
||
_, err = logList.WriteTo(file) | ||
if err != nil { | ||
return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayHeads.Items[ind].Name, err) | ||
} | ||
|
||
// Copy over the rest of the files under /tmp/ray/session_latest/logs | ||
filePathInPod := "/tmp/ray/session_latest/logs/" | ||
|
||
req := kubeClientSet.CoreV1().RESTClient(). | ||
Get(). | ||
Namespace(rayHeads.Items[ind].Namespace). | ||
Resource("pods"). | ||
Name(rayHeads.Items[ind].Name). | ||
SubResource("exec"). | ||
VersionedParams(&corev1.PodExecOptions{ | ||
Command: []string{"tar", "--warning=no-file-changed", "-cf", "-", "-C", filePathInPod, "."}, | ||
Stdin: true, | ||
Stdout: true, | ||
Stderr: true, | ||
TTY: false, | ||
}, clientgoscheme.ParameterCodec) | ||
|
||
// Execute the command | ||
restconfig, err := factory.ToRESTConfig() | ||
if err != nil { | ||
return fmt.Errorf("failed to get restconfig: %w", err) | ||
} | ||
|
||
exec, err := remotecommand.NewSPDYExecutor(restconfig, "POST", req.URL()) | ||
if err != nil { | ||
return fmt.Errorf("failed to call request with error: %w", err) | ||
} | ||
|
||
// Create synchronous writer/reader for downloading the files | ||
outreader, outStream := io.Pipe() | ||
go func() { | ||
defer outStream.Close() | ||
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ | ||
Stdin: options.ioStreams.In, | ||
Stdout: outStream, | ||
Stderr: options.ioStreams.ErrOut, | ||
Tty: false, | ||
}) | ||
}() | ||
|
||
// Goes through the tar and create/copy them one by one into the destination dir | ||
tarReader := tar.NewReader(outreader) | ||
header, err := tarReader.Next() | ||
if err != nil && !errors.Is(err, io.EOF) { | ||
return fmt.Errorf("error will extracting tar file %w", err) | ||
} | ||
for !errors.Is(err, io.EOF) { | ||
fmt.Printf("Downloading file %s for Ray Head %s\n", header.Name, rayHeads.Items[ind].Name) | ||
if err != nil { | ||
return fmt.Errorf("Error reading tar archive: %w", err) | ||
} | ||
|
||
// Construct the full local path and a directory for the tmp file logs | ||
logfile := rayHeads.Items[ind].Name + "-logs" | ||
localFilePath := filepath.Join(path.Clean(options.outputDir), path.Clean(logfile), path.Clean(header.Name)) | ||
|
||
switch header.Typeflag { | ||
case tar.TypeDir: | ||
// Create directory | ||
if err := os.MkdirAll(localFilePath, 0o755); err != nil { | ||
return fmt.Errorf("Error creating directory: %w", err) | ||
} | ||
case tar.TypeReg: | ||
// Create file and write contents | ||
outFile, err := os.OpenFile(localFilePath, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode)) | ||
if err != nil { | ||
return fmt.Errorf("Error creating file: %w", err) | ||
} | ||
defer outFile.Close() | ||
// This is to limit the copy size for a decompression bomb, currently set arbitrarily | ||
_, err = io.CopyN(outFile, tarReader, 1000000) | ||
|
||
if err != nil && !errors.Is(err, io.EOF) { | ||
return fmt.Errorf("failed while writing to file: %w", err) | ||
} | ||
default: | ||
fmt.Printf("Ignoring unsupported file type: %b", header.Typeflag) | ||
} | ||
header, err = tarReader.Next() | ||
if err != nil && !errors.Is(err, io.EOF) { | ||
return fmt.Errorf("error will extracting tar file %w", err) | ||
} | ||
} | ||
} | ||
return nil | ||
} |
Oops, something went wrong.