From bd197421d9afe554a1833311e65069887d90ff31 Mon Sep 17 00:00:00 2001 From: Andrei-Lucian Mociu Date: Fri, 9 Jun 2023 18:42:36 +0000 Subject: [PATCH 1/2] Diagnostics, Pastebin, SFTP Manager and more... --- cmd/diagnostics.go | 108 +++++++----- cmd/root.go | 18 +- config/config.go | 110 +----------- config/config_cluster.go | 2 + environment/cluster.go | 26 +-- environment/environment.go | 10 +- environment/kubernetes/environment.go | 10 +- environment/kubernetes/pod.go | 176 +++++++++++--------- environment/kubernetes/securitycontext.go | 50 ++++++ go.mod | 3 + go.sum | 6 + router/router_server.go | 2 +- router/router_server_backup.go | 30 ++-- router/router_system.go | 40 +++-- server/filesystem/compress.go | 109 ++++++------ server/filesystem/filesystem.go | 150 ++++------------- server/filesystem/sftp_manager.go | 194 ++++++++++++++++++++-- server/filesystem/stat.go | 14 +- server/install.go | 78 +++++---- server/listeners.go | 7 +- server/manager.go | 4 +- server/power.go | 16 +- server/server.go | 8 +- server/snapshot.go | 2 +- server/snapshot/local.go | 51 +++--- system/system.go | 105 +++++++++++- 26 files changed, 751 insertions(+), 578 deletions(-) create mode 100644 environment/kubernetes/securitycontext.go diff --git a/cmd/diagnostics.go b/cmd/diagnostics.go index 9025f86d..abb4b5c6 100644 --- a/cmd/diagnostics.go +++ b/cmd/diagnostics.go @@ -1,12 +1,14 @@ package cmd import ( + "bytes" "errors" "fmt" "io" + "math/rand" + "mime/multipart" "net/http" "net/url" - "os" "os/exec" "path" "strconv" @@ -28,7 +30,7 @@ import ( ) const ( - DefaultHastebinUrl = "https://ptero.co" + DefaultPastebinUrl = "https://pb.kubectyl.org" DefaultLogLines = 200 ) @@ -36,7 +38,7 @@ var diagnosticsArgs struct { IncludeEndpoints bool IncludeLogs bool ReviewBeforeUpload bool - HastebinURL string + PastebinURL string LogLines int } @@ -51,7 +53,7 @@ func newDiagnosticsCommand() *cobra.Command { Run: diagnosticsCmdRun, } - command.Flags().StringVar(&diagnosticsArgs.HastebinURL, "hastebin-url", DefaultHastebinUrl, "the url of the hastebin instance to use") + command.Flags().StringVar(&diagnosticsArgs.PastebinURL, "hastebin-url", DefaultPastebinUrl, "the url of the hastebin instance to use") command.Flags().IntVar(&diagnosticsArgs.LogLines, "log-lines", DefaultLogLines, "the number of log lines to include in the report") return command @@ -77,7 +79,7 @@ func diagnosticsCmdRun(*cobra.Command, []string) { { Name: "ReviewBeforeUpload", Prompt: &survey.Confirm{ - Message: "Do you want to review the collected data before uploading to " + diagnosticsArgs.HastebinURL + "?", + Message: "Do you want to review the collected data before uploading to " + diagnosticsArgs.PastebinURL + "?", Help: "The data, especially the logs, might contain sensitive information, so you should review it. You will be asked again if you want to upload.", Default: true, }, @@ -112,27 +114,17 @@ func diagnosticsCmdRun(*cobra.Command, []string) { {"Logs Directory", cfg.System.LogDirectory}, {"Data Directory", cfg.System.Data}, {"Archive Directory", cfg.System.ArchiveDirectory}, - {"Backup Directory", cfg.System.BackupDirectory}, - {"Username", cfg.System.Username}, {"Server Time", time.Now().Format(time.RFC1123Z)}, {"Debug Mode", fmt.Sprintf("%t", cfg.Debug)}, } - table := tablewriter.NewWriter(os.Stdout) + table := tablewriter.NewWriter(output) table.SetHeader([]string{"Variable", "Value"}) table.SetRowLine(true) table.AppendBulk(data) table.Render() - printHeader(output, "Docker: Running Containers") - c := exec.Command("docker", "ps") - if co, err := c.Output(); err == nil { - output.Write(co) - } else { - fmt.Fprint(output, "Couldn't list containers: ", err) - } - printHeader(output, "Latest Kuber Logs") if diagnosticsArgs.IncludeLogs { p := "/var/log/kubectyl/kuber.log" @@ -162,16 +154,38 @@ func diagnosticsCmdRun(*cobra.Command, []string) { fmt.Println(output.String()) fmt.Print("--------------- end of report ---------------\n\n") - // upload := !diagnosticsArgs.ReviewBeforeUpload - // if !upload { - // survey.AskOne(&survey.Confirm{Message: "Upload to " + diagnosticsArgs.HastebinURL + "?", Default: false}, &upload) - // } - // if upload { - // u, err := uploadToHastebin(diagnosticsArgs.HastebinURL, output.String()) - // if err == nil { - // fmt.Println("Your report is available here: ", u) - // } - // } + upload := !diagnosticsArgs.ReviewBeforeUpload + if !upload { + survey.AskOne(&survey.Confirm{Message: "Upload to " + diagnosticsArgs.PastebinURL + "?", Default: false}, &upload) + } + if upload { + passwordFunc := func(length int) string { + charset := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + password := make([]byte, length) + + for i := 0; i < length; i++ { + password[i] = charset[rand.Intn(len(charset))] + } + + return string(password) + } + rand.Seed(time.Now().UnixNano()) + + password := passwordFunc(8) + result, err := uploadToPastebin(diagnosticsArgs.PastebinURL, output.String(), password) + if err == nil { + seconds, err := strconv.Atoi(fmt.Sprintf("%v", result["expire"])) + if err != nil { + return + } + + expireTime := fmt.Sprintf("%d hours, %d minutes, %d seconds", seconds/3600, (seconds%3600)/60, seconds%60) + + fmt.Println("Your report is available here:", result["url"]) + fmt.Println("Will expire in", expireTime) + fmt.Printf("You can edit your pastebin here: %s\n", result["admin"]) + } + } } // func getDockerInfo() (types.Version, types.Info, error) { @@ -190,31 +204,45 @@ func diagnosticsCmdRun(*cobra.Command, []string) { // return dockerVersion, dockerInfo, nil // } -func uploadToHastebin(hbUrl, content string) (string, error) { - r := strings.NewReader(content) - u, err := url.Parse(hbUrl) +func uploadToPastebin(pbURL, content, password string) (map[string]interface{}, error) { + payload := &bytes.Buffer{} + writer := multipart.NewWriter(payload) + writer.WriteField("c", content) + writer.WriteField("e", "300") + writer.WriteField("s", password) + writer.Close() + + u, err := url.Parse(pbURL) if err != nil { - return "", err + return nil, err } - u.Path = path.Join(u.Path, "documents") - res, err := http.Post(u.String(), "plain/text", r) + + req, err := http.NewRequest("POST", u.String(), payload) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", writer.FormDataContentType()) + + client := &http.Client{} + res, err := client.Do(req) if err != nil || res.StatusCode != 200 { - fmt.Println("Failed to upload report to ", u.String(), err) - return "", err + fmt.Println("Failed to upload report to", u.String(), err) + return nil, err } + pres := make(map[string]interface{}) body, err := io.ReadAll(res.Body) if err != nil { fmt.Println("Failed to parse response.", err) - return "", err + return nil, err } json.Unmarshal(body, &pres) - if key, ok := pres["key"].(string); ok { - u, _ := url.Parse(hbUrl) - u.Path = path.Join(u.Path, key) - return u.String(), nil + if key, ok := pres["url"].(string); ok { + u.Path = key + return pres, nil } - return "", errors.New("failed to find key in response") + + return nil, errors.New("failed to find key in response") } func redact(s string) string { diff --git a/cmd/root.go b/cmd/root.go index 37f0c7bf..91dac9e4 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -116,14 +116,6 @@ func rootCmdRun(cmd *cobra.Command, _ []string) { if err := environment.CreateSftpSecret(); err != nil { log.WithField("error", err).Fatal("failed to create sftp secret") } - if err := config.EnsureKubectylUser(); err != nil { - log.WithField("error", err).Fatal("failed to create kubectyl system user") - } - log.WithFields(log.Fields{ - "username": config.Get().System.Username, - "uid": config.Get().System.User.Uid, - "gid": config.Get().System.User.Gid, - }).Info("configured system user successfully") if err := config.EnableLogRotation(); err != nil { log.WithField("error", err).Fatal("failed to configure log rotation on the system") return @@ -256,7 +248,10 @@ func rootCmdRun(cmd *cobra.Command, _ []string) { // we use goroutine to avoid blocking whole process go func() { - if err := s.Environment.CreateSFTP(s.Context()); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := s.Environment.CreateSFTP(ctx, cancel); err != nil { log.WithField("error", err).Warn("failed to create server SFTP pod") } }() @@ -303,11 +298,6 @@ func rootCmdRun(cmd *cobra.Command, _ []string) { log.WithField("error", err).Error("failed to create archive directory") } - // Ensure the backup directory exists. - if err := os.MkdirAll(sys.BackupDirectory, 0o755); err != nil { - log.WithField("error", err).Error("failed to create backup directory") - } - autotls, _ := cmd.Flags().GetBool("auto-tls") tlshostname, _ := cmd.Flags().GetString("tls-hostname") if autotls && tlshostname == "" { diff --git a/config/config.go b/config/config.go index 92fc80ec..dfc1c3bc 100644 --- a/config/config.go +++ b/config/config.go @@ -6,11 +6,9 @@ import ( "fmt" "os" "os/exec" - "os/user" "path" "path/filepath" "regexp" - "strings" "sync" "text/template" "time" @@ -21,8 +19,6 @@ import ( "github.com/creasty/defaults" "github.com/gbrlsnchs/jwt/v3" "gopkg.in/yaml.v2" - - "github.com/kubectyl/kuber/system" ) const DefaultLocation = "/etc/kubectyl/config.yml" @@ -134,16 +130,10 @@ type SystemConfiguration struct { // Directory where server archives for transferring will be stored. ArchiveDirectory string `default:"/var/lib/kubectyl/archives" yaml:"archive_directory"` - // Directory where local backups will be stored on the machine. - BackupDirectory string `default:"/var/lib/kubectyl/backups" yaml:"backup_directory"` - // TmpDirectory specifies where temporary files for Kubectyl installation processes // should be created. This supports environments running docker-in-docker. TmpDirectory string `default:"/tmp/kubectyl" yaml:"tmp_directory"` - // The user that should own all of the server files, and be used for containers. - Username string `default:"kubectyl" yaml:"username"` - // The timezone for this Kuber instance. This is detected by Kuber automatically if possible, // and falls back to UTC if not able to be detected. If you need to set this manually, that // can also be done. @@ -151,27 +141,6 @@ type SystemConfiguration struct { // This timezone value is passed into all containers created by Kuber. Timezone string `yaml:"timezone"` - // Definitions for the user that gets created to ensure that we can quickly access - // this information without constantly having to do a system lookup. - User struct { - // Rootless controls settings related to rootless container daemons. - Rootless struct { - // Enabled controls whether rootless containers are enabled. - Enabled bool `yaml:"enabled" default:"false"` - // ContainerUID controls the UID of the user inside the container. - // This should likely be set to 0 so the container runs as the user - // running Kuber. - ContainerUID int `yaml:"container_uid" default:"0"` - // ContainerGID controls the GID of the user inside the container. - // This should likely be set to 0 so the container runs as the user - // running Kuber. - ContainerGID int `yaml:"container_gid" default:"0"` - } `yaml:"rootless"` - - Uid int `yaml:"uid"` - Gid int `yaml:"gid"` - } `yaml:"user"` - // The amount of time in seconds that can elapse before a server's disk space calculation is // considered stale and a re-check should occur. DANGER: setting this value too low can seriously // impact system performance and cause massive I/O bottlenecks and high CPU usage for the Kuber @@ -202,7 +171,7 @@ type SystemConfiguration struct { EnableLogRotate bool `default:"true" yaml:"enable_log_rotate"` // The number of lines to send when a server connects to the websocket. - WebsocketLogCount int `default:"150" yaml:"websocket_log_count"` + WebsocketLogCount int64 `default:"150" yaml:"websocket_log_count"` Sftp SftpConfiguration `yaml:"sftp"` @@ -422,78 +391,6 @@ func WriteToDisk(c *Configuration) error { return nil } -// EnsureKubectylUser ensures that the Kubectyl core user exists on the -// system. This user will be the owner of all data in the root data directory -// and is used as the user within containers. If files are not owned by this -// user there will be issues with permissions on Docker mount points. -// -// This function IS NOT thread safe and should only be called in the main thread -// when the application is booting. -func EnsureKubectylUser() error { - sysName, err := getSystemName() - if err != nil { - return err - } - - // Our way of detecting if kuber is running inside of Docker. - if sysName == "distroless" { - _config.System.Username = system.FirstNotEmpty(os.Getenv("KUBER_USERNAME"), "kubectyl") - _config.System.User.Uid = system.MustInt(system.FirstNotEmpty(os.Getenv("KUBER_UID"), "988")) - _config.System.User.Gid = system.MustInt(system.FirstNotEmpty(os.Getenv("KUBER_GID"), "988")) - return nil - } - - if _config.System.User.Rootless.Enabled { - log.Info("rootless mode is enabled, skipping user creation...") - u, err := user.Current() - if err != nil { - return err - } - _config.System.Username = u.Username - _config.System.User.Uid = system.MustInt(u.Uid) - _config.System.User.Gid = system.MustInt(u.Gid) - return nil - } - - log.WithField("username", _config.System.Username).Info("checking for kubectyl system user") - u, err := user.Lookup(_config.System.Username) - // If an error is returned but it isn't the unknown user error just abort - // the process entirely. If we did find a user, return it immediately. - if err != nil { - if _, ok := err.(user.UnknownUserError); !ok { - return err - } - } else { - _config.System.User.Uid = system.MustInt(u.Uid) - _config.System.User.Gid = system.MustInt(u.Gid) - return nil - } - - command := fmt.Sprintf("useradd --system --no-create-home --shell /usr/sbin/nologin %s", _config.System.Username) - // Alpine Linux is the only OS we currently support that doesn't work with the useradd - // command, so in those cases we just modify the command a bit to work as expected. - if strings.HasPrefix(sysName, "alpine") { - command = fmt.Sprintf("adduser -S -D -H -G %[1]s -s /sbin/nologin %[1]s", _config.System.Username) - // We have to create the group first on Alpine, so do that here before continuing on - // to the user creation process. - if _, err := exec.Command("addgroup", "-S", _config.System.Username).Output(); err != nil { - return err - } - } - - split := strings.Split(command, " ") - if _, err := exec.Command(split[0], split[1:]...).Output(); err != nil { - return err - } - u, err = user.Lookup(_config.System.Username) - if err != nil { - return err - } - _config.System.User.Uid = system.MustInt(u.Uid) - _config.System.User.Gid = system.MustInt(u.Gid) - return nil -} - // FromFile reads the configuration from the provided file and stores it in the // global singleton for this instance. func FromFile(path string) error { @@ -553,11 +450,6 @@ func ConfigureDirectories() error { return err } - log.WithField("path", _config.System.BackupDirectory).Debug("ensuring backup data directory exists") - if err := os.MkdirAll(_config.System.BackupDirectory, 0o700); err != nil { - return err - } - return nil } diff --git a/config/config_cluster.go b/config/config_cluster.go index 9f137134..9e5d30f2 100644 --- a/config/config_cluster.go +++ b/config/config_cluster.go @@ -51,6 +51,8 @@ type ClusterConfiguration struct { SnapshotClass string `json:"snapshot_class" yaml:"snapshot_class"` ExternalTrafficPolicy string `json:"external_traffic_policy" default:"cluster" yaml:"external_traffic_policy"` + + RestrictedPodSecurityStandard bool `default:"true" json:"restricted_standard" yaml:"restricted_standard"` } type ClusterNetworkConfiguration struct { diff --git a/environment/cluster.go b/environment/cluster.go index 8d41c650..5bcf9040 100644 --- a/environment/cluster.go +++ b/environment/cluster.go @@ -69,26 +69,12 @@ func Cluster() (c *rest.Config, clientset *kubernetes.Clientset, err error) { c.TLSClientConfig.KeyData = keyData } - tokenFile := "/var/run/secrets/kubernetes.io/serviceaccount/token" - - _, err = os.Stat(tokenFile) - if err == nil { - token, err := os.ReadFile(tokenFile) + // Automatic setup of the Kubernetes client's connection ensuring secure communication. + if sa := os.Getenv("KUBECONFIG_IN_CLUSTER"); sa == "true" { + c, err = rest.InClusterConfig() if err != nil { panic(err) } - - c.Host = os.Getenv("KUBERNETES_SERVICE_HOST") - c.BearerToken = string(token) - - if !cfg.Insecure { - caData, err = os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") - if err != nil { - fmt.Printf("Error reading certificate authority data: %v\n", err) - } else { - c.TLSClientConfig.CAData = caData - } - } } client, err := kubernetes.NewForConfig(c) @@ -109,7 +95,7 @@ func CreateSftpConfigmap() error { tempDir := cfg.System.TmpDirectory tempFile := filepath.Join(tempDir, "sftp.yaml") - data := map[string]interface{}{ + yamlData, err := yaml.Marshal(map[string]interface{}{ "debug": cfg.Debug, "token_id": cfg.AuthenticationTokenId, "token": cfg.AuthenticationToken, @@ -120,9 +106,7 @@ func CreateSftpConfigmap() error { }, "remote": cfg.PanelLocation, "remote_query": cfg.RemoteQuery, - } - - yamlData, err := yaml.Marshal(&data) + }) if err != nil { return err } diff --git a/environment/environment.go b/environment/environment.go index b7d17f48..ea756947 100644 --- a/environment/environment.go +++ b/environment/environment.go @@ -6,7 +6,7 @@ import ( "time" "github.com/kubectyl/kuber/events" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" ) const ( @@ -93,7 +93,7 @@ type ProcessEnvironment interface { ExitState() (uint32, bool, error) // Creates the pod for SFTP server. - CreateSFTP(ctx context.Context) error + CreateSFTP(ctx context.Context, cancelFunc context.CancelFunc) error // Creates the necessary environment for running the server process. For example, // in the Docker environment create will create a new container instance for the @@ -112,11 +112,11 @@ type ProcessEnvironment interface { SendCommand(string) error // Return service details - GetServiceDetails() []v1.Service + GetServiceDetails() []corev1.Service // Reads the log file for the process from the end backwards until the provided // number of lines is met. - Readlog(int) ([]string, error) + Readlog(int64) ([]string, error) // Returns the current state of the environment. State() string @@ -132,4 +132,6 @@ type ProcessEnvironment interface { // SetLogCallback sets the callback that the container's log output will be passed to. SetLogCallback(func([]byte)) + + ConfigurePodSpecForRestrictedStandard(*corev1.PodSpec) error } diff --git a/environment/kubernetes/environment.go b/environment/kubernetes/environment.go index e3c87965..c98edeae 100644 --- a/environment/kubernetes/environment.go +++ b/environment/kubernetes/environment.go @@ -3,7 +3,6 @@ package kubernetes import ( "context" "fmt" - "io" "math/rand" "strings" "sync" @@ -57,9 +56,6 @@ type Environment struct { // the running container instance. stream remotecommand.Executor - // Holds the stats stream used by the polling commands so that we can easily close it out. - stats io.ReadCloser - emitter *events.Bus logCallbackMx sync.Mutex @@ -208,14 +204,16 @@ func (e *Environment) CreateServiceWithUniquePort() (int, error) { } func (e *Environment) WatchPodEvents(ctx context.Context) error { + cfg := config.Get().Cluster + eventListWatcher := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.FieldSelector = fmt.Sprintf("involvedObject.kind=Pod,involvedObject.name=%s", e.Id) - return e.client.CoreV1().Events(config.Get().Cluster.Namespace).List(ctx, options) + return e.client.CoreV1().Events(cfg.Namespace).List(ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { options.FieldSelector = fmt.Sprintf("involvedObject.kind=Pod,involvedObject.name=%s", e.Id) - return e.client.CoreV1().Events(config.Get().Cluster.Namespace).Watch(ctx, options) + return e.client.CoreV1().Events(cfg.Namespace).Watch(ctx, options) }, } diff --git a/environment/kubernetes/pod.go b/environment/kubernetes/pod.go index e962a795..d0147dd9 100644 --- a/environment/kubernetes/pod.go +++ b/environment/kubernetes/pod.go @@ -13,20 +13,19 @@ import ( errors2 "emperror.dev/errors" "github.com/apex/log" - "github.com/docker/docker/api/types/mount" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/remotecommand" + "k8s.io/utils/pointer" "github.com/kubectyl/kuber/config" "github.com/kubectyl/kuber/environment" "github.com/kubectyl/kuber/system" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -200,18 +199,6 @@ func (e *Environment) Create() error { resources := e.Configuration.Limits() - dnsPolicies := map[string]corev1.DNSPolicy{ - "clusterfirstwithhostnet": corev1.DNSClusterFirstWithHostNet, - "default": corev1.DNSDefault, - "none": corev1.DNSNone, - "clusterfirst": corev1.DNSClusterFirst, - } - - dnspolicy, ok := dnsPolicies[cfg.Cluster.DNSPolicy] - if !ok { - dnspolicy = corev1.DNSClusterFirst - } - imagePullPolicies := map[string]corev1.PullPolicy{ "always": corev1.PullAlways, "never": corev1.PullNever, @@ -247,10 +234,17 @@ func (e *Environment) Create() error { }, Spec: corev1.PodSpec{ SecurityContext: &corev1.PodSecurityContext{ - FSGroup: &[]int64{2000}[0], + FSGroup: pointer.Int64(2000), FSGroupChangePolicy: &fsGroupChangePolicy, + RunAsUser: pointer.Int64(1000), + RunAsNonRoot: pointer.Bool(true), }, - DNSPolicy: dnspolicy, + DNSPolicy: map[string]corev1.DNSPolicy{ + "clusterfirstwithhostnet": corev1.DNSClusterFirstWithHostNet, + "default": corev1.DNSDefault, + "none": corev1.DNSNone, + "clusterfirst": corev1.DNSClusterFirst, + }[cfg.Cluster.DNSPolicy], DNSConfig: &corev1.PodDNSConfig{Nameservers: config.Get().Cluster.Network.Dns}, Volumes: []corev1.Volume{ { @@ -309,10 +303,6 @@ func (e *Environment) Create() error { Protocol: corev1.Protocol("UDP"), }, }, - SecurityContext: &corev1.SecurityContext{ - AllowPrivilegeEscalation: &[]bool{false}[0], - RunAsUser: &[]int64{int64(0)}[0], - }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ corev1.ResourceCPU: *resource.NewMilliQuantity(10*resources.CpuLimit, resource.DecimalSI), @@ -373,6 +363,10 @@ func (e *Environment) Create() error { }, } + volumeMounts, volumes := e.convertMounts() + pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, volumeMounts...) + pod.Spec.Volumes = append(pod.Spec.Volumes, volumes...) + // Disable CPU request / limit if is set to Unlimited if resources.CpuLimit == 0 { pod.Spec.Containers[0].Resources = corev1.ResourceRequirements{ @@ -428,12 +422,15 @@ func (e *Environment) Create() error { // Add a new initContainer to the Pod newInitContainer := corev1.Container{ - // Name: "replace-" + strings.ToLower(strings.ReplaceAll(k.FileName, ".", "-")), Name: "configuration-files", Image: "busybox", ImagePullPolicy: corev1.PullIfNotPresent, - Command: command, - Resources: corev1.ResourceRequirements{}, + SecurityContext: &corev1.SecurityContext{ + RunAsUser: pointer.Int64(1000), + RunAsNonRoot: pointer.Bool(true), + }, + Command: command, + Resources: corev1.ResourceRequirements{}, VolumeMounts: []corev1.VolumeMount{ { Name: "replacement", @@ -524,12 +521,11 @@ func (e *Environment) Create() error { } } - // Set the user running the container properly depending on what mode we are operating in. - securityContext := pod.Spec.Containers[0].SecurityContext - if cfg.System.User.Rootless.Enabled { - securityContext.RunAsNonRoot = &[]bool{true}[0] - securityContext.RunAsUser = &[]int64{int64(cfg.System.User.Rootless.ContainerUID)}[0] - securityContext.RunAsGroup = &[]int64{int64(cfg.System.User.Rootless.ContainerGID)}[0] + // Configure pod spec for restricted security standard + if cfg.Cluster.RestrictedPodSecurityStandard { + if err := e.ConfigurePodSpecForRestrictedStandard(&pod.Spec); err != nil { + return err + } } // Check if the services exists before we create the actual pod. @@ -729,32 +725,9 @@ func (e *Environment) CreateService() error { return nil } -func (e *Environment) CreateSFTP(ctx context.Context) error { +func (e *Environment) CreateSFTP(ctx context.Context, cancelFunc context.CancelFunc) error { cfg := config.Get() - dnsPolicies := map[string]corev1.DNSPolicy{ - "clusterfirstwithhostnet": corev1.DNSClusterFirstWithHostNet, - "default": corev1.DNSDefault, - "none": corev1.DNSNone, - "clusterfirst": corev1.DNSClusterFirst, - } - - dnspolicy, ok := dnsPolicies[cfg.Cluster.DNSPolicy] - if !ok { - dnspolicy = corev1.DNSClusterFirst - } - - imagePullPolicies := map[string]corev1.PullPolicy{ - "always": corev1.PullAlways, - "never": corev1.PullNever, - "ifnotpresent": corev1.PullIfNotPresent, - } - - imagepullpolicy, ok := imagePullPolicies[cfg.Cluster.ImagePullPolicy] - if !ok { - imagepullpolicy = corev1.PullIfNotPresent - } - fsGroupChangePolicy := corev1.FSGroupChangeOnRootMismatch pod := &corev1.Pod{ @@ -773,10 +746,17 @@ func (e *Environment) CreateSFTP(ctx context.Context) error { }, Spec: corev1.PodSpec{ SecurityContext: &corev1.PodSecurityContext{ - FSGroup: &[]int64{2000}[0], + FSGroup: pointer.Int64(2000), FSGroupChangePolicy: &fsGroupChangePolicy, + RunAsUser: pointer.Int64(1000), + RunAsNonRoot: pointer.Bool(true), }, - DNSPolicy: dnspolicy, + DNSPolicy: map[string]corev1.DNSPolicy{ + "clusterfirstwithhostnet": corev1.DNSClusterFirstWithHostNet, + "default": corev1.DNSDefault, + "none": corev1.DNSNone, + "clusterfirst": corev1.DNSClusterFirst, + }[cfg.Cluster.DNSPolicy], DNSConfig: &corev1.PodDNSConfig{Nameservers: config.Get().Cluster.Network.Dns}, Volumes: []corev1.Volume{ { @@ -805,12 +785,22 @@ func (e *Environment) CreateSFTP(ctx context.Context) error { }, }, }, + { + Name: "logs", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, }, Containers: []corev1.Container{ { - Name: "sftp-server", - Image: cfg.System.Sftp.SftpImage, - ImagePullPolicy: imagepullpolicy, + Name: "sftp-server", + Image: cfg.System.Sftp.SftpImage, + ImagePullPolicy: map[string]corev1.PullPolicy{ + "always": corev1.PullAlways, + "never": corev1.PullNever, + "ifnotpresent": corev1.PullIfNotPresent, + }[cfg.Cluster.ImagePullPolicy], Env: []corev1.EnvVar{ { Name: "P_SERVER_UUID", @@ -837,6 +827,10 @@ func (e *Environment) CreateSFTP(ctx context.Context) error { ReadOnly: true, MountPath: path.Join(cfg.System.Data, ".sftp"), }, + { + Name: "logs", + MountPath: cfg.System.LogDirectory, + }, }, }, }, @@ -844,6 +838,13 @@ func (e *Environment) CreateSFTP(ctx context.Context) error { }, } + // Configure pod spec for restricted security standard + if cfg.Cluster.RestrictedPodSecurityStandard { + if err := e.ConfigurePodSpecForRestrictedStandard(&pod.Spec); err != nil { + return err + } + } + if len(e.Configuration.NodeSelectors()) > 0 { pod.Spec.NodeSelector = map[string]string{} @@ -866,15 +867,17 @@ func (e *Environment) CreateSFTP(ctx context.Context) error { if err != nil { return err } - if pod.Status.Phase != v1.PodRunning { - var zero int64 = 0 + if pod.Status.Phase != corev1.PodRunning { policy := metav1.DeletePropagationForeground - if err := e.client.CoreV1().Pods(config.Get().Cluster.Namespace).Delete(ctx, e.Id, metav1.DeleteOptions{GracePeriodSeconds: &zero, PropagationPolicy: &policy}); err != nil { + if err := e.client.CoreV1().Pods(config.Get().Cluster.Namespace).Delete(ctx, e.Id, metav1.DeleteOptions{ + GracePeriodSeconds: pointer.Int64(0), + PropagationPolicy: &policy, + }); err != nil { return err } - return e.CreateSFTP(ctx) + return e.CreateSFTP(ctx, cancelFunc) } } else { e.log().WithField("error", err).Warn("environment/kubernetes: failed to create SFTP pod") @@ -888,11 +891,11 @@ func (e *Environment) CreateSFTP(ctx context.Context) error { } switch pod.Status.Phase { - case v1.PodPending: + case corev1.PodPending: return false, nil - case v1.PodRunning: + case corev1.PodRunning: return true, nil - case v1.PodFailed, v1.PodSucceeded: + case corev1.PodFailed, corev1.PodSucceeded: return false, fmt.Errorf("pod ran to completion") default: return false, fmt.Errorf("unknown pod status") @@ -918,7 +921,6 @@ func (e *Environment) Destroy() error { // We set it to stopping than offline to prevent crash detection from being triggered. e.SetState(environment.ProcessStoppingState) - var zero int64 = 0 policy := metav1.DeletePropagationForeground // Loop through services with service Kubectyl and server UUID and delete them @@ -929,7 +931,10 @@ func (e *Environment) Destroy() error { return err } else { for _, s := range services.Items { - err := e.client.CoreV1().Services(config.Get().Cluster.Namespace).Delete(ctx, s.Name, metav1.DeleteOptions{}) + err := e.client.CoreV1().Services(config.Get().Cluster.Namespace).Delete(ctx, s.Name, metav1.DeleteOptions{ + GracePeriodSeconds: pointer.Int64(30), + PropagationPolicy: &policy, + }) if err != nil { return err } @@ -938,7 +943,7 @@ func (e *Environment) Destroy() error { // Delete configmaps err = e.client.CoreV1().ConfigMaps(config.Get().Cluster.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{ - GracePeriodSeconds: &zero, + GracePeriodSeconds: pointer.Int64(30), PropagationPolicy: &policy, }, metav1.ListOptions{ LabelSelector: fmt.Sprintf("Service=Kubectyl,uuid=%s", e.Id), @@ -949,7 +954,7 @@ func (e *Environment) Destroy() error { // Delete pods err = e.client.CoreV1().Pods(config.Get().Cluster.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{ - GracePeriodSeconds: &zero, + GracePeriodSeconds: pointer.Int64(30), PropagationPolicy: &policy, }, metav1.ListOptions{ LabelSelector: fmt.Sprintf("Service=Kubectyl,uuid=%s", e.Id), @@ -960,7 +965,7 @@ func (e *Environment) Destroy() error { // Delete pvc err = e.client.CoreV1().PersistentVolumeClaims(config.Get().Cluster.Namespace).Delete(ctx, e.Id+"-pvc", metav1.DeleteOptions{ - GracePeriodSeconds: &zero, + GracePeriodSeconds: pointer.Int64(30), PropagationPolicy: &policy, }) if err != nil && !errors.IsNotFound(err) { @@ -1013,10 +1018,10 @@ func (e *Environment) SendCommand(c string) error { // Readlog reads the log file for the server. This does not care if the server // is running or not, it will simply try to read the last X bytes of the file // and return them. -func (e *Environment) Readlog(lines int) ([]string, error) { +func (e *Environment) Readlog(lines int64) ([]string, error) { r := e.client.CoreV1().Pods(config.Get().Cluster.Namespace).GetLogs(e.Id, &corev1.PodLogOptions{ Container: "process", - TailLines: &[]int64{int64(lines)}[0], + TailLines: pointer.Int64(lines), }) podLogs, err := r.Stream(context.Background()) if err != nil { @@ -1033,17 +1038,26 @@ func (e *Environment) Readlog(lines int) ([]string, error) { return out, nil } -func (e *Environment) convertMounts() []mount.Mount { - var out []mount.Mount +func (e *Environment) convertMounts() ([]corev1.VolumeMount, []corev1.Volume) { + var out []corev1.VolumeMount + var volumes []corev1.Volume - for _, m := range e.Configuration.Mounts() { - out = append(out, mount.Mount{ - Type: mount.TypeBind, - Source: m.Source, - Target: m.Target, - ReadOnly: m.ReadOnly, + for i, m := range e.Configuration.Mounts() { + out = append(out, corev1.VolumeMount{ + Name: fmt.Sprintf("volume-%d", i), + MountPath: m.Target, + ReadOnly: m.ReadOnly, + }) + + volumes = append(volumes, corev1.Volume{ + Name: fmt.Sprintf("volume-%d", i), + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: m.Source, + }, + }, }) } - return out + return out, volumes } diff --git a/environment/kubernetes/securitycontext.go b/environment/kubernetes/securitycontext.go new file mode 100644 index 00000000..d9abb3b9 --- /dev/null +++ b/environment/kubernetes/securitycontext.go @@ -0,0 +1,50 @@ +package kubernetes + +import ( + "emperror.dev/errors" + + corev1 "k8s.io/api/core/v1" + "k8s.io/utils/pointer" + + "github.com/imdario/mergo" +) + +func (e *Environment) ConfigurePodSpecForRestrictedStandard(podSpec *corev1.PodSpec) error { + podSecurityContext := corev1.PodSpec{ + SecurityContext: &corev1.PodSecurityContext{ + RunAsNonRoot: pointer.Bool(true), + SeccompProfile: &corev1.SeccompProfile{ + Type: corev1.SeccompProfileTypeRuntimeDefault, + }, + }, + } + + containerSecurityContext := corev1.Container{ + SecurityContext: &corev1.SecurityContext{ + AllowPrivilegeEscalation: pointer.Bool(false), + Capabilities: &corev1.Capabilities{ + Drop: []corev1.Capability{"ALL"}, + }, + }, + } + + for i, container := range podSpec.Containers { + if err := mergo.Merge(&container, containerSecurityContext, mergo.WithOverride); err != nil { + return err + } + podSpec.Containers[i] = container + } + + for i, initContainer := range podSpec.InitContainers { + if err := mergo.Merge(&initContainer, containerSecurityContext, mergo.WithOverride); err != nil { + return err + } + podSpec.InitContainers[i] = initContainer + } + + if err := mergo.Merge(podSpec, podSecurityContext, mergo.WithOverride); err != nil { + return errors.Wrap(err, "failed to merge pod security context") + } + + return nil +} diff --git a/go.mod b/go.mod index 7862de75..9e9fb067 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( require ( github.com/Microsoft/go-winio v0.6.0 // indirect github.com/Microsoft/hcsshim v0.9.6 // indirect + github.com/ViaQ/logerr/v2 v2.1.0 github.com/andybalholm/brotli v1.0.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/distribution v2.8.1+incompatible // indirect @@ -104,6 +105,8 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/go-version v1.6.0 + github.com/imdario/mergo v0.3.16 github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect diff --git a/go.sum b/go.sum index a75412c5..aaa2886b 100644 --- a/go.sum +++ b/go.sum @@ -98,6 +98,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= +github.com/ViaQ/logerr/v2 v2.1.0 h1:8WwzuNa1x+a6tRUl+6sFel83A/QxlFBUaFW2FyG2zzY= +github.com/ViaQ/logerr/v2 v2.1.0/go.mod h1:/qoWLm3YG40Sv5u75s4fvzjZ5p36xINzaxU2L+DJ9uw= github.com/acobaugh/osrelease v0.1.0 h1:Yb59HQDGGNhCj4suHaFQQfBps5wyoKLSSX/J/+UifRE= github.com/acobaugh/osrelease v0.1.0/go.mod h1:4bFEs0MtgHNHBrmHCt67gNisnabCRAlzdVasCEGHTWY= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -520,6 +522,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= @@ -537,6 +541,8 @@ github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= +github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= diff --git a/router/router_server.go b/router/router_server.go index bf56ad5e..4a6665f2 100644 --- a/router/router_server.go +++ b/router/router_server.go @@ -32,7 +32,7 @@ func getServerLogs(c *gin.Context) { l = 100 } - out, err := s.ReadLogfile(l) + out, err := s.ReadLogfile(int64(l)) if err != nil { middleware.CaptureAndAbort(c, err) return diff --git a/router/router_server_backup.go b/router/router_server_backup.go index 14ebc1ed..bb9e6950 100644 --- a/router/router_server_backup.go +++ b/router/router_server_backup.go @@ -1,6 +1,7 @@ package router import ( + "context" "net/http" "os" @@ -26,7 +27,6 @@ func postServerBackup(c *gin.Context) { var data struct { Adapter snapshot.AdapterType `json:"adapter"` Uuid string `json:"uuid"` - Ignore string `json:"ignore"` } if err := c.BindJSON(&data); err != nil { return @@ -46,7 +46,7 @@ func postServerBackup(c *gin.Context) { return } - adapter := snapshot.NewLocal(snapshotClient, client, clientset, data.Uuid, data.Ignore) + adapter := snapshot.NewLocal(snapshotClient, client, clientset, data.Uuid) // Attach the server ID and the request ID to the adapter log context for easier // parsing in the logs. @@ -64,12 +64,11 @@ func postServerBackup(c *gin.Context) { c.Status(http.StatusAccepted) } -// postServerRestoreBackup handles restoring a backup for a server by downloading -// or finding the given backup on the system and then unpacking the archive into -// the server's data directory. If the TruncateDirectory field is provided and -// is true all of the files will be deleted for the server. +// postServerRestoreBackup handles restoring a snapshot for a server by downloading +// or finding the given snapshot on the system and then unpacking the archive into +// the server's data directory. // -// This endpoint will block until the backup is fully restored allowing for a +// This endpoint will block until the snapshot is fully restored allowing for a // spinner to be displayed in the Panel UI effectively. // // TODO: stop the server if it is running @@ -79,8 +78,7 @@ func postServerRestoreBackup(c *gin.Context) { logger := middleware.ExtractLogger(c) var data struct { - Adapter snapshot.AdapterType `binding:"required,oneof=kuber s3" json:"adapter"` - TruncateDirectory bool `json:"truncate_directory"` + Adapter snapshot.AdapterType `binding:"required,oneof=kuber s3" json:"adapter"` // A UUID is always required for this endpoint, however the download URL // is only present when the given adapter type is s3. DownloadUrl string `json:"download_url"` @@ -99,14 +97,7 @@ func postServerRestoreBackup(c *gin.Context) { s.SetRestoring(false) }() - logger.Info("processing server backup restore request") - if data.TruncateDirectory { - logger.Info("received \"truncate_directory\" flag in request: deleting server files") - if err := s.Filesystem().TruncateRootDirectory(); err != nil { - middleware.CaptureAndAbort(c, err) - return - } - } + logger.Info("processing server snapshot restore request") // Now that we've cleaned up the data directory if necessary, grab the snapshot file // and attempt to restore it into the server directory. @@ -139,6 +130,11 @@ func postServerRestoreBackup(c *gin.Context) { s.Events().Publish(server.BackupRestoreCompletedEvent, "") logger.Info("completed server restoration from local snapshot") s.SetRestoring(false) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.Environment.CreateSFTP(ctx, cancel) }(s, b, logger) hasError = false c.Status(http.StatusAccepted) diff --git a/router/router_system.go b/router/router_system.go index ae8657e6..25019bc0 100644 --- a/router/router_system.go +++ b/router/router_system.go @@ -26,34 +26,35 @@ import ( // Returns information about the system that kuber is running on. func getSystemInformation(c *gin.Context) { - i, err := system.GetSystemInformation() + // i, err := system.GetSystemInformation(environment) + // if err != nil { + // middleware.CaptureAndAbort(c, err) + // return + // } + + // if c.Query("v") == "2" { + // c.JSON(http.StatusOK, i) + // return + // } + + cfg, clientset, err := environment.Cluster() if err != nil { middleware.CaptureAndAbort(c, err) return } - if c.Query("v") == "2" { - c.JSON(http.StatusOK, i) - return - } - - // cfg := config.Get().Cluster - - config, _, err := environment.Cluster() + i, err := system.GetSystemInformation(clientset, config.Get().Cluster.Namespace) if err != nil { middleware.CaptureAndAbort(c, err) return } - // config := &rest.Config{ - // Host: cfg.Host, - // BearerToken: cfg.BearerToken, - // TLSClientConfig: rest.TLSClientConfig{ - // Insecure: cfg.Insecure, - // }, - // } + if c.Query("v") == "2" { + c.JSON(http.StatusOK, i) + return + } - discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) + discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg) if err != nil { middleware.CaptureAndAbort(c, err) return @@ -148,7 +149,10 @@ func postCreateServer(c *gin.Context) { } } } else { - if err := i.Server().Environment.CreateSFTP(i.Server().Context()); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := i.Server().Environment.CreateSFTP(ctx, cancel); err != nil { log.WithFields(log.Fields{"server_id": i.Server().ID(), "error": err}).Error("encountered error processing server SFTP process") } log.WithField("server_id", i.Server().ID()).Debug("skipping automatic start after successful server installation") diff --git a/server/filesystem/compress.go b/server/filesystem/compress.go index 55be107a..196f7ba8 100644 --- a/server/filesystem/compress.go +++ b/server/filesystem/compress.go @@ -23,8 +23,8 @@ import ( "github.com/ulikunitz/xz" ) -func (fs *Filesystem) createArchive(connection *SFTPConn, cleaned []string, archivePath string) error { - archiveFile, err := connection.sftpClient.Create(archivePath) +func (fs *Filesystem) createArchive(cleaned []string, archivePath string) error { + archiveFile, err := fs.manager.Create(archivePath) if err != nil { return fmt.Errorf("failed to create archive file: %v", err) } @@ -39,21 +39,21 @@ func (fs *Filesystem) createArchive(connection *SFTPConn, cleaned []string, arch defer tarWriter.Close() for _, remoteFile := range cleaned { - fileInfo, err := connection.sftpClient.Stat(remoteFile) + fileInfo, err := fs.manager.Stat(remoteFile) if err != nil { log.Printf("Failed to retrieve file info for %q: %v", remoteFile, err) continue } if fileInfo.IsDir() { - err = fs.addDirectoryToArchive(connection.sftpClient, remoteFile, tarWriter) + err = fs.addDirectoryToArchive(remoteFile, tarWriter) if err != nil { log.Printf("Failed to add directory %q to archive: %v", remoteFile, err) } continue } - remoteReader, err := connection.sftpClient.Open(remoteFile) + remoteReader, err := fs.manager.Open(remoteFile) if err != nil { log.Printf("Failed to open remote file %q: %v", remoteFile, err) continue @@ -79,8 +79,8 @@ func (fs *Filesystem) createArchive(connection *SFTPConn, cleaned []string, arch return nil } -func (fs *Filesystem) addDirectoryToArchive(sftpClient *sftp.Client, remoteDir string, archiveWriter *tar.Writer) error { - entries, err := sftpClient.ReadDir(remoteDir) +func (fs *Filesystem) addDirectoryToArchive(remoteDir string, archiveWriter *tar.Writer) error { + entries, err := fs.manager.ReadDir(remoteDir) if err != nil { return fmt.Errorf("failed to read directory %q: %v", remoteDir, err) } @@ -89,14 +89,14 @@ func (fs *Filesystem) addDirectoryToArchive(sftpClient *sftp.Client, remoteDir s remotePath := filepath.Join(remoteDir, entry.Name()) if entry.IsDir() { - err = fs.addDirectoryToArchive(sftpClient, remotePath, archiveWriter) + err = fs.addDirectoryToArchive(remotePath, archiveWriter) if err != nil { return err } continue } - remoteReader, err := sftpClient.Open(remotePath) + remoteReader, err := fs.manager.Open(remotePath) if err != nil { log.Printf("Failed to open remote file %q: %v", remotePath, err) continue @@ -147,17 +147,12 @@ func (fs *Filesystem) CompressFiles(dir string, paths []string) (os.FileInfo, er return nil, err } - connection, err := fs.manager.GetConnection() - if err != nil { - return nil, err - } - d := path.Join( cleanedRootDir, fmt.Sprintf("archive-%s.tar.gz", strings.ReplaceAll(time.Now().Format(time.RFC3339), ":", "")), ) - if err := fs.createArchive(connection, cleaned, d); err != nil { + if err := fs.createArchive(cleaned, d); err != nil { return nil, err } @@ -187,13 +182,8 @@ func (fs *Filesystem) SpaceAvailableForDecompression(ctx context.Context, dir st // waiting an unnecessary amount of time on this call. dirSize, _ := fs.DiskUsage(false) - connection, err := fs.manager.GetConnection() - if err != nil { - return err - } - var size int64 - err = walkDirSFTP(connection.sftpClient, ".", func(path string, fileInfo os.FileInfo, err error) error { + err := fs.walkDirSFTP(".", func(path string, fileInfo os.FileInfo, err error) error { if err != nil { return err } @@ -224,8 +214,8 @@ func (fs *Filesystem) SpaceAvailableForDecompression(ctx context.Context, dir st type SFTPWalkerFunc func(path string, fileInfo os.FileInfo, err error) error -func walkDirSFTP(client *sftp.Client, dirPath string, fn SFTPWalkerFunc) error { - entries, err := client.ReadDir(dirPath) +func (fs *Filesystem) walkDirSFTP(dirPath string, fn SFTPWalkerFunc) error { + entries, err := fs.manager.ReadDir(dirPath) if err != nil { return err } @@ -234,7 +224,7 @@ func walkDirSFTP(client *sftp.Client, dirPath string, fn SFTPWalkerFunc) error { subPath := filepath.Join(dirPath, entry.Name()) if entry.IsDir() { - err := walkDirSFTP(client, subPath, fn) + err := fs.walkDirSFTP(subPath, fn) if err != nil { if err := fn(subPath, entry, err); err != nil && err != filepath.SkipDir { return err @@ -273,12 +263,7 @@ func (fs *Filesystem) DecompressFileUnsafe(ctx context.Context, dir string, file return errors.WithStack(err) } - connection, err := fs.manager.GetConnection() - if err != nil { - return err - } - - f, err := connection.sftpClient.Open(file) + f, err := fs.manager.Open(file) if err != nil { return err } @@ -298,25 +283,25 @@ func (fs *Filesystem) DecompressFileUnsafe(ctx context.Context, dir string, file switch mime.String() { case "application/vnd.rar", "application/x-rar-compressed": - return extractRARArchive(connection.sftpClient, file, dir) + return fs.extractRARArchive(file, dir) case "application/x-tar", "application/x-br", "application/x-lzip", "application/x-sz", "application/zstd": - return extractTARArchive(connection.sftpClient, file, dir) + return fs.extractTARArchive(file, dir) case "application/x-xz": - return extractTARXZArchive(connection.sftpClient, file, dir) + return fs.extractTARXZArchive(file, dir) case "application/x-bzip2": - return extractBZIP2Archive(connection.sftpClient, file, dir) + return fs.extractBZIP2Archive(file, dir) case "application/gzip", "application/x-gzip": - return extractGZArchive(connection.sftpClient, file, dir) + return fs.extractGZArchive(file, dir) case "application/zip": - return extractZIPArchive(connection.sftpClient, file, dir) + return fs.extractZIPArchive(file, dir) default: return fmt.Errorf("unsupported archive format: %s", mime.String()) } } // Extract RAR archive file from the remote SFTP server. -func extractRARArchive(sftpClient *sftp.Client, remoteFilePath string, destinationDir string) error { - remoteFile, err := sftpClient.Open(remoteFilePath) +func (fs *Filesystem) extractRARArchive(remoteFilePath string, destinationDir string) error { + remoteFile, err := fs.manager.Open(remoteFilePath) if err != nil { return fmt.Errorf("failed to open remote file: %v", err) } @@ -338,7 +323,7 @@ func extractRARArchive(sftpClient *sftp.Client, remoteFilePath string, destinati if header.IsDir { destDir := filepath.Join(destinationDir, header.Name) - err := sftpClient.MkdirAll(destDir) + err := fs.manager.MkdirAll(destDir) if err != nil { return fmt.Errorf("failed to create directory: %v", err) } @@ -346,7 +331,7 @@ func extractRARArchive(sftpClient *sftp.Client, remoteFilePath string, destinati } destFilePath := filepath.Join(destinationDir, header.Name) - destFile, err := sftpClient.Create(destFilePath) + destFile, err := fs.manager.Create(destFilePath) if err != nil { return fmt.Errorf("failed to create file: %v", err) } @@ -362,8 +347,8 @@ func extractRARArchive(sftpClient *sftp.Client, remoteFilePath string, destinati } // Extract TAR archive file from the remote SFTP server. -func extractTARArchive(sftpClient *sftp.Client, remoteFilePath string, destinationDir string) error { - remoteFile, err := sftpClient.Open(remoteFilePath) +func (fs *Filesystem) extractTARArchive(remoteFilePath string, destinationDir string) error { + remoteFile, err := fs.manager.Open(remoteFilePath) if err != nil { return fmt.Errorf("failed to open remote file: %v", err) } @@ -384,12 +369,12 @@ func extractTARArchive(sftpClient *sftp.Client, remoteFilePath string, destinati switch header.Typeflag { case tar.TypeDir: - err := sftpClient.MkdirAll(destFilePath) + err := fs.manager.MkdirAll(destFilePath) if err != nil { return fmt.Errorf("failed to create directory: %v", err) } case tar.TypeReg: - destFile, err := sftpClient.Create(destFilePath) + destFile, err := fs.manager.Create(destFilePath) if err != nil { return fmt.Errorf("failed to create file: %v", err) } @@ -408,8 +393,8 @@ func extractTARArchive(sftpClient *sftp.Client, remoteFilePath string, destinati } // Extract GZ archive file from the remote SFTP server. -func extractGZArchive(sftpClient *sftp.Client, remoteFilePath string, destinationDir string) error { - remoteFile, err := sftpClient.Open(remoteFilePath) +func (fs *Filesystem) extractGZArchive(remoteFilePath string, destinationDir string) error { + remoteFile, err := fs.manager.Open(remoteFilePath) if err != nil { return fmt.Errorf("failed to open remote file: %v", err) } @@ -436,12 +421,12 @@ func extractGZArchive(sftpClient *sftp.Client, remoteFilePath string, destinatio switch header.Typeflag { case tar.TypeDir: - err := sftpClient.MkdirAll(destFilePath) + err := fs.manager.MkdirAll(destFilePath) if err != nil { return fmt.Errorf("failed to create directory: %v", err) } case tar.TypeReg: - destFile, err := sftpClient.Create(destFilePath) + destFile, err := fs.manager.Create(destFilePath) if err != nil { return fmt.Errorf("failed to create file: %v", err) } @@ -460,8 +445,8 @@ func extractGZArchive(sftpClient *sftp.Client, remoteFilePath string, destinatio } // Extract TAR.XZ archive file from the remote SFTP server. -func extractTARXZArchive(sftpClient *sftp.Client, remoteFilePath string, destinationDir string) error { - remoteFile, err := sftpClient.Open(remoteFilePath) +func (fs *Filesystem) extractTARXZArchive(remoteFilePath string, destinationDir string) error { + remoteFile, err := fs.manager.Open(remoteFilePath) if err != nil { return fmt.Errorf("failed to open remote file: %v", err) } @@ -487,12 +472,12 @@ func extractTARXZArchive(sftpClient *sftp.Client, remoteFilePath string, destina switch header.Typeflag { case tar.TypeDir: - err := sftpClient.MkdirAll(destFilePath) + err := fs.manager.MkdirAll(destFilePath) if err != nil { return fmt.Errorf("failed to create directory: %v", err) } case tar.TypeReg: - destFile, err := sftpClient.Create(destFilePath) + destFile, err := fs.manager.Create(destFilePath) if err != nil { return fmt.Errorf("failed to create file: %v", err) } @@ -511,8 +496,8 @@ func extractTARXZArchive(sftpClient *sftp.Client, remoteFilePath string, destina } // Extract BZIP2 archive file from the remote SFTP server. -func extractBZIP2Archive(sftpClient *sftp.Client, remoteFilePath string, destinationDir string) error { - remoteFile, err := sftpClient.Open(remoteFilePath) +func (fs *Filesystem) extractBZIP2Archive(remoteFilePath string, destinationDir string) error { + remoteFile, err := fs.manager.Open(remoteFilePath) if err != nil { return fmt.Errorf("failed to open remote file: %v", err) } @@ -534,12 +519,12 @@ func extractBZIP2Archive(sftpClient *sftp.Client, remoteFilePath string, destina switch header.Typeflag { case tar.TypeDir: - err := sftpClient.MkdirAll(destFilePath) + err := fs.manager.MkdirAll(destFilePath) if err != nil { return fmt.Errorf("failed to create directory: %v", err) } case tar.TypeReg: - destFile, err := sftpClient.Create(destFilePath) + destFile, err := fs.manager.Create(destFilePath) if err != nil { return fmt.Errorf("failed to create file: %v", err) } @@ -558,14 +543,14 @@ func extractBZIP2Archive(sftpClient *sftp.Client, remoteFilePath string, destina } // Extract ZIP archive file from the remote SFTP server. -func extractZIPArchive(sftpClient *sftp.Client, remoteFilePath string, destinationDir string) error { - remoteFile, err := sftpClient.Open(remoteFilePath) +func (fs *Filesystem) extractZIPArchive(remoteFilePath string, destinationDir string) error { + remoteFile, err := fs.manager.Open(remoteFilePath) if err != nil { return fmt.Errorf("failed to open remote file: %v", err) } defer remoteFile.Close() - remoteFileInfo, err := sftpClient.Stat(remoteFilePath) + remoteFileInfo, err := fs.manager.Stat(remoteFilePath) if err != nil { return err } @@ -579,20 +564,20 @@ func extractZIPArchive(sftpClient *sftp.Client, remoteFilePath string, destinati destFilePath := filepath.Join(destinationDir, file.Name) if file.FileInfo().IsDir() { - err := sftpClient.MkdirAll(destFilePath) + err := fs.manager.MkdirAll(destFilePath) if err != nil { return fmt.Errorf("failed to create directory: %v", err) } continue } - destFile, err := sftpClient.Create(destFilePath) + destFile, err := fs.manager.Create(destFilePath) if err != nil { return fmt.Errorf("failed to create file: %v", err) } defer destFile.Close() - srcFile, err := sftpClient.Open(file.Name) + srcFile, err := fs.manager.Open(file.Name) if err != nil { return fmt.Errorf("failed to open file inside ZIP: %v", err) } diff --git a/server/filesystem/filesystem.go b/server/filesystem/filesystem.go index 186164a3..1618c74f 100644 --- a/server/filesystem/filesystem.go +++ b/server/filesystem/filesystem.go @@ -83,9 +83,10 @@ func New(root string, size int64, denylist []string, addr string) *Filesystem { } } -func (fs *Filesystem) SetManager(addr string) error { +func (fs *Filesystem) SetManager(addr string) { + fs.mu.Lock() fs.manager = NewBasicSFTPManager(addr, clientConfig) - return nil + fs.mu.Unlock() } // Path returns the root path for the Filesystem instance. @@ -99,12 +100,7 @@ func (fs *Filesystem) File(p string) (*sftp.File, os.FileInfo, error) { if err != nil { return nil, Stat{}, errors.WithStackIf(err) } - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return nil, nil, err - } - st, err := connection.sftpClient.Stat(cleaned) + st, err := fs.manager.Stat(cleaned) if err != nil { if errors.Is(err, os.ErrNotExist) { return nil, Stat{}, newFilesystemError(ErrNotExist, err) @@ -114,7 +110,7 @@ func (fs *Filesystem) File(p string) (*sftp.File, os.FileInfo, error) { if st.IsDir() { return nil, Stat{}, newFilesystemError(ErrCodeIsDirectory, nil) } - f, err := connection.sftpClient.Open(cleaned) + f, err := fs.manager.Open(cleaned) if err != nil { return nil, Stat{}, errors.WithStackIf(err) } @@ -130,13 +126,7 @@ func (fs *Filesystem) Touch(p string, flag int) (*sftp.File, error) { return nil, err } - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return nil, err - } - - f, err := connection.sftpClient.OpenFile(cleaned, flag) + f, err := fs.manager.OpenFile(cleaned, flag) if err == nil { return f, nil } @@ -145,10 +135,10 @@ func (fs *Filesystem) Touch(p string, flag int) (*sftp.File, error) { return nil, errors.Wrap(err, "server/filesystem: touch: failed to open file handle") } // Only create and chown the directory if it doesn't exist. - if _, err := connection.sftpClient.Stat(filepath.Dir(cleaned)); errors.Is(err, os.ErrNotExist) { + if _, err := fs.manager.Stat(filepath.Dir(cleaned)); errors.Is(err, os.ErrNotExist) { // Create the path leading up to the file we're trying to create, setting the final perms // on it as we go. - if err := connection.sftpClient.MkdirAll(filepath.Dir(cleaned)); err != nil { + if err := fs.manager.MkdirAll(filepath.Dir(cleaned)); err != nil { return nil, errors.Wrap(err, "server/filesystem: touch: failed to create directory tree") } if err := fs.Chown(filepath.Dir(cleaned)); err != nil { @@ -175,16 +165,10 @@ func (fs *Filesystem) Writefile(p string, r io.Reader) error { return err } - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return err - } - var currentSize int64 // If the file does not exist on the system already go ahead and create the pathway // to it and an empty file. We'll then write to it later on after this completes. - stat, err := connection.sftpClient.Stat(cleaned) + stat, err := fs.manager.Stat(cleaned) if err != nil && !os.IsNotExist(err) { return errors.Wrap(err, "server/filesystem: writefile: failed to stat file") } else if err == nil { @@ -226,12 +210,7 @@ func (fs *Filesystem) CreateDirectory(name string, p string) error { if err != nil { return err } - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return err - } - return connection.sftpClient.MkdirAll(cleaned) + return fs.manager.MkdirAll(cleaned) } // Rename moves (or renames) a file or directory. @@ -246,15 +225,9 @@ func (fs *Filesystem) Rename(from string, to string) error { return errors.WithStack(err) } - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return err - } - // If the target file or directory already exists the rename function will fail, so just // bail out now. - if _, err := connection.sftpClient.Stat(cleanedTo); err == nil { + if _, err := fs.manager.Stat(cleanedTo); err == nil { return os.ErrExist } @@ -266,12 +239,12 @@ func (fs *Filesystem) Rename(from string, to string) error { // Ensure that the directory we're moving into exists correctly on the system. Only do this if // we're not at the root directory level. if d != fs.Path() { - if mkerr := connection.sftpClient.MkdirAll(d); mkerr != nil { + if mkerr := fs.manager.MkdirAll(d); mkerr != nil { return errors.WithMessage(mkerr, "failed to create directory structure for file rename") } } - if err := connection.sftpClient.Rename(cleanedFrom, cleanedTo); err != nil { + if err := fs.manager.Rename(cleanedFrom, cleanedTo); err != nil { return errors.WithStack(err) } return nil @@ -291,23 +264,14 @@ func (fs *Filesystem) Chown(path string) error { return nil } - uid := config.Get().System.User.Uid - gid := config.Get().System.User.Gid - - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return err - } - // Start by just chowning the initial path that we received. - if err := connection.sftpClient.Chown(cleaned, uid, gid); err != nil { + if err := fs.manager.Chown(cleaned); err != nil { return errors.Wrap(err, "server/filesystem: chown: failed to chown path") } // If this is not a directory we can now return from the function, there is nothing // left that we need to do. - if st, err := connection.sftpClient.Stat(cleaned); err != nil || !st.IsDir() { + if st, err := fs.manager.Stat(cleaned); err != nil || !st.IsDir() { return nil } @@ -327,7 +291,7 @@ func (fs *Filesystem) Chown(path string) error { return nil } - return connection.sftpClient.Chown(p, uid, gid) + return fs.manager.Chown(p) }, }) @@ -345,13 +309,7 @@ func (fs *Filesystem) Chmod(path string, mode os.FileMode) error { return nil } - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return err - } - - if err := connection.sftpClient.Chmod(cleaned, mode); err != nil { + if err := fs.manager.Chmod(cleaned, mode); err != nil { return err } @@ -375,16 +333,10 @@ func (fs *Filesystem) findCopySuffix(dir string, name string, extension string) suffix = " copy " + strconv.Itoa(i) } - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return "", err - } - n := name + suffix + extension // If we stat the file and it does not exist that means we're good to create the copy. If it // does exist, we'll just continue to the next loop and try again. - if _, err := connection.sftpClient.Stat(path.Join(dir, n)); err != nil { + if _, err := fs.manager.Stat(path.Join(dir, n)); err != nil { if !errors.Is(err, os.ErrNotExist) { return "", err } @@ -408,13 +360,7 @@ func (fs *Filesystem) Copy(p string) error { return err } - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return err - } - - s, err := connection.sftpClient.Stat(cleaned) + s, err := fs.manager.Stat(cleaned) if err != nil { return err } else if s.IsDir() || !s.Mode().IsRegular() { @@ -441,7 +387,7 @@ func (fs *Filesystem) Copy(p string) error { name = strings.TrimSuffix(name, ".tar") } - source, err := connection.sftpClient.Open(cleaned) + source, err := fs.manager.Open(cleaned) if err != nil { return err } @@ -458,15 +404,10 @@ func (fs *Filesystem) Copy(p string) error { // TruncateRootDirectory removes _all_ files and directories from a server's // data directory and resets the used disk space to zero. func (fs *Filesystem) TruncateRootDirectory() error { - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return err - } - if err := connection.sftpClient.Remove(fs.Path()); err != nil { + if err := fs.manager.Remove(fs.Path()); err != nil { return err } - if err := connection.sftpClient.Mkdir(fs.Path()); err != nil { + if err := fs.manager.Mkdir(fs.Path()); err != nil { return err } atomic.StoreInt64(&fs.diskUsed, 0) @@ -496,13 +437,7 @@ func (fs *Filesystem) Delete(p string) error { return errors.New("cannot delete root server directory") } - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return err - } - - if st, err := connection.sftpClient.Lstat(resolved); err != nil { + if st, err := fs.manager.Lstat(resolved); err != nil { if !os.IsNotExist(err) { fs.error(err).Warn("error while attempting to stat file before deletion") } @@ -524,36 +459,31 @@ func (fs *Filesystem) Delete(p string) error { wg.Wait() - return connection.sftpClient.Remove(resolved) + return fs.manager.Remove(resolved) } func (fs *Filesystem) deleteRecursive(name string) error { - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return err - } - entries, err := connection.sftpClient.ReadDir(name) + entries, err := fs.manager.ReadDir(name) if err != nil { return errors.Wrap(err, "ReadDir") } if len(entries) == 0 { - err = connection.sftpClient.RemoveDirectory(name) + err = fs.manager.RemoveDirectory(name) if err != nil { return errors.Wrap(err, "RemoveDirectory") } } for _, fi := range entries { - itemName := connection.sftpClient.Join(name, fi.Name()) + itemName, _ := fs.manager.Join(name, fi.Name()) if fi.IsDir() { err = fs.deleteRecursive(itemName) if err != nil { return errors.Wrap(err, "ReadDir") } - err = connection.sftpClient.RemoveDirectory(name) + err = fs.manager.RemoveDirectory(name) if err != nil { return errors.Wrap(err, "RemoveDirectory") } @@ -561,7 +491,7 @@ func (fs *Filesystem) deleteRecursive(name string) error { continue } - err := connection.sftpClient.Remove(itemName) + err := fs.manager.Remove(itemName) if err != nil { return errors.Wrap(err, "ReadDir") } @@ -578,17 +508,7 @@ func (fs *Filesystem) ListDirectory(p string) ([]Stat, error) { return nil, err } - connection, err := fs.manager.GetConnection() - if connection == nil || err != nil { - return nil, errors.New("processing error") - } - - // Check if the SFTP client connection is valid - if connection.sftpClient == nil { - return nil, errors.New("client connection is invalid") - } - - files, err := connection.sftpClient.ReadDir(cleaned) + files, err := fs.manager.ReadDir(cleaned) if err != nil { return nil, err } @@ -621,7 +541,7 @@ func (fs *Filesystem) ListDirectory(p string) ([]Stat, error) { // // @see https://github.com/pterodactyl/panel/issues/4059 if cleanedp != "" && f.Mode()&os.ModeNamedPipe == 0 { - file, err := connection.sftpClient.Open(filepath.Join(cleaned, f.Name())) + file, err := fs.manager.Open(filepath.Join(cleaned, f.Name())) if err != nil { panic(fmt.Errorf("Error SFTP Open: %s", err)) } @@ -671,13 +591,7 @@ func (fs *Filesystem) Chtimes(path string, atime, mtime time.Time) error { return nil } - connection, err := fs.manager.GetConnection() - if err != nil { - fmt.Println(err) - return err - } - - if err := connection.sftpClient.Chtimes(cleaned, atime, mtime); err != nil { + if err := fs.manager.Chtimes(cleaned, atime, mtime); err != nil { return err } diff --git a/server/filesystem/sftp_manager.go b/server/filesystem/sftp_manager.go index 9270f7d3..ef8bd16b 100644 --- a/server/filesystem/sftp_manager.go +++ b/server/filesystem/sftp_manager.go @@ -1,12 +1,16 @@ package filesystem import ( - "bytes" "fmt" - "log" + "io/fs" + + "github.com/apex/log" + "sync" "sync/atomic" + "time" + "emperror.dev/errors" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" ) @@ -51,8 +55,8 @@ func (s *SFTPConn) GetClient() *sftp.Client { func (s *SFTPConn) Close() error { s.Lock() defer s.Unlock() - if s.closed == true { - return fmt.Errorf("Connection was already closed") + if s.closed { + return fmt.Errorf("connection was already closed") } s.shutdown <- true @@ -65,19 +69,18 @@ func (s *SFTPConn) Close() error { // for SFTPConn returned by NewClient type BasicSFTPManager struct { conns []*SFTPConn - log *log.Logger + log *log.Entry connString string sshConfig *ssh.ClientConfig } // NewBasicSFTPManager returns a BasicSFTPManager func NewBasicSFTPManager(connString string, config *ssh.ClientConfig) *BasicSFTPManager { - var buf bytes.Buffer manager := &BasicSFTPManager{ conns: make([]*SFTPConn, 0), connString: connString, sshConfig: config, - log: log.New(&buf, "logger: ", log.Lshortfile), + log: log.WithFields(log.Fields{"sftp_manager": connString, "timeout": config.Timeout}), } return manager } @@ -92,12 +95,12 @@ func (m *BasicSFTPManager) handleReconnects(c *SFTPConn) { case <-c.shutdown: c.sshConn.Close() break - // case res := <-closed: + // case res := <-closed: case <-closed: - // fmt.Println("Connection closed, reconnecting: ", res) + // m.log.WithField("error", res).Error("connection closed, reconnecting...") conn, err := ssh.Dial("tcp", m.connString, m.sshConfig) if err != nil { - // fmt.Println("Failed to reconnect:" + err.Error()) + // m.log.WithField("error", err).Error("failed to reconnect.") m.Close() return } @@ -158,3 +161,174 @@ func (m *BasicSFTPManager) Close() error { m.conns = nil return nil } + +func (m *BasicSFTPManager) withConnectionCheck(fn func(*sftp.Client) (interface{}, error)) (interface{}, error) { + connection, err := m.GetConnection() + if err != nil { + return nil, err + } + if connection == nil || connection.sftpClient == nil { + return nil, errors.New("client connection is invalid") + } + return fn(connection.sftpClient) +} + +func (m *BasicSFTPManager) Stat(p string) (fs.FileInfo, error) { + result, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return client.Stat(p) + }) + if err != nil { + return nil, err + } + + fileInfo, ok := result.(fs.FileInfo) + if !ok { + return nil, errors.New("invalid file info type") + } + + return fileInfo, nil +} + +func (m *BasicSFTPManager) Open(path string) (*sftp.File, error) { + result, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return client.Open(path) + }) + if err != nil { + return nil, err + } + + file, ok := result.(sftp.File) + if !ok { + return nil, errors.New("invalid file") + } + return &file, err +} + +func (m *BasicSFTPManager) Rename(oldname, newname string) error { + _, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return nil, client.Rename(oldname, newname) + }) + return err +} + +func (m *BasicSFTPManager) Create(path string) (*sftp.File, error) { + result, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return client.Create(path) + }) + if err != nil { + return nil, err + } + + file, ok := result.(sftp.File) + if !ok { + return nil, errors.New("invalid file") + } + return &file, err +} + +func (m *BasicSFTPManager) ReadDir(p string) ([]fs.FileInfo, error) { + result, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return client.ReadDir(p) + }) + if err != nil { + return nil, err + } + + fileInfo, ok := result.([]fs.FileInfo) + if !ok { + return nil, errors.New("invalid file info type") + } + return fileInfo, err +} + +func (m *BasicSFTPManager) MkdirAll(path string) error { + _, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return nil, client.MkdirAll(path) + }) + return err +} + +func (m *BasicSFTPManager) Chown(path string) error { + _, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return nil, client.Chown(path, 1000, 1000) + }) + return err +} + +func (m *BasicSFTPManager) OpenFile(path string, f int) (*sftp.File, error) { + result, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return client.OpenFile(path, f) + }) + file, ok := result.(sftp.File) + if !ok { + return nil, errors.New("invalid file") + } + return &file, err +} + +func (m *BasicSFTPManager) Chmod(path string, mode fs.FileMode) error { + _, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return nil, client.Chmod(path, mode) + }) + return err +} + +func (m *BasicSFTPManager) Remove(path string) error { + _, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return nil, client.Remove(path) + }) + return err +} + +func (m *BasicSFTPManager) Mkdir(path string) error { + _, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return nil, client.Mkdir(path) + }) + return err +} + +func (m *BasicSFTPManager) Lstat(p string) (fs.FileInfo, error) { + result, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return client.Lstat(p) + }) + if err != nil { + return nil, err + } + + fileInfo, ok := result.(fs.FileInfo) + if !ok { + return nil, errors.New("invalid file info type") + } + + return fileInfo, nil +} + +func (m *BasicSFTPManager) RemoveDirectory(path string) error { + _, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return nil, client.RemoveDirectory(path) + }) + return err +} + +func (m *BasicSFTPManager) Join(elem ...string) (string, error) { + result, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return client.Join(elem...), nil + }) + if err != nil { + return "", err + } + + joinResult, ok := result.(string) + if !ok { + return "", errors.New("invalid join result type") + } + + return joinResult, err +} + +func (m *BasicSFTPManager) Chtimes(path string, atime time.Time, mtime time.Time) error { + _, err := m.withConnectionCheck(func(client *sftp.Client) (interface{}, error) { + return nil, client.Chtimes(path, atime, mtime) + }) + return err +} diff --git a/server/filesystem/stat.go b/server/filesystem/stat.go index c6f46047..21f48ee1 100644 --- a/server/filesystem/stat.go +++ b/server/filesystem/stat.go @@ -6,7 +6,6 @@ import ( "strconv" "time" - "emperror.dev/errors" "github.com/gabriel-vasile/mimetype" "github.com/goccy/go-json" ) @@ -54,21 +53,18 @@ func (fs *Filesystem) Stat(p string) (Stat, error) { } func (fs *Filesystem) unsafeStat(p string) (Stat, error) { - connection, err := fs.manager.GetConnection() + s, err := fs.manager.Stat(p) if err != nil { return Stat{}, err } - if connection == nil || connection.sftpClient == nil { - return Stat{}, errors.New("client connection is invalid") - } - s, err := connection.sftpClient.Stat(p) - if err != nil { - return Stat{}, err + + if s == nil { + return Stat{}, nil } var m *mimetype.MIME if !s.IsDir() { - f, err := connection.sftpClient.Open(p) + f, err := fs.manager.Open(p) if err != nil { return Stat{}, err } diff --git a/server/install.go b/server/install.go index 75f3943d..16b29850 100644 --- a/server/install.go +++ b/server/install.go @@ -15,6 +15,7 @@ import ( errors2 "emperror.dev/errors" "github.com/apex/log" "k8s.io/client-go/kubernetes" + "k8s.io/utils/pointer" "github.com/kubectyl/kuber/config" "github.com/kubectyl/kuber/environment" @@ -146,13 +147,15 @@ func (s *Server) internalInstall() error { } } - if err := s.fs.SetManager(fmt.Sprintf("%s:%v", ip, port)); err != nil { - return - } + s.fs.SetManager(fmt.Sprintf("%s:%v", ip, port)) } } s.Log().Info("booting server SFTP process for the first time after installation") - if err := s.Environment.CreateSFTP(s.Context()); err != nil { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := s.Environment.CreateSFTP(ctx, cancel); err != nil { return } }() @@ -481,7 +484,7 @@ func (ip *InstallationProcess) Execute() (string, error) { "storage": *resource.NewQuantity(ip.Server.DiskSpace(), resource.BinarySI), }, }, - StorageClassName: &[]string{config.Get().Cluster.StorageClass}[0], + StorageClassName: pointer.String(config.Get().Cluster.StorageClass), }, } @@ -525,19 +528,19 @@ func (ip *InstallationProcess) Execute() (string, error) { LocalObjectReference: corev1.LocalObjectReference{ Name: ip.Server.ID() + "-installer", }, - DefaultMode: &[]int32{int32(0755)}[0], + DefaultMode: pointer.Int32(0755), }, }, }, }, + SecurityContext: &corev1.PodSecurityContext{ + RunAsUser: pointer.Int64(1000), + RunAsNonRoot: pointer.Bool(true), + }, Containers: []corev1.Container{ { Name: "installer", Image: ip.Script.ContainerImage, - SecurityContext: &corev1.SecurityContext{ - AllowPrivilegeEscalation: &[]bool{false}[0], - RunAsUser: &[]int64{int64(0)}[0], - }, Command: []string{ "/mnt/install/install.sh", }, @@ -583,11 +586,12 @@ func (ip *InstallationProcess) Execute() (string, error) { } cfg := config.Get() - securityContext := pod.Spec.Containers[0].SecurityContext - if cfg.System.User.Rootless.Enabled { - securityContext.RunAsNonRoot = &[]bool{false}[0] - securityContext.RunAsUser = &[]int64{int64(cfg.System.User.Rootless.ContainerUID)}[0] - securityContext.RunAsGroup = &[]int64{int64(cfg.System.User.Rootless.ContainerGID)}[0] + + // Configure pod spec for restricted security standard + if cfg.Cluster.RestrictedPodSecurityStandard { + if err := ip.Server.Environment.ConfigurePodSpecForRestrictedStandard(&pod.Spec); err != nil { + return "", err + } } ip.Server.Log().WithField("install_script", ip.tempDir()+"/install.sh").Info("creating install container for server process") @@ -611,45 +615,63 @@ func (ip *InstallationProcess) Execute() (string, error) { // // If there is an error during the streaming output just report it and do nothing else, the // install can still run, the console just won't have any output. - go func(id string) { + go func(ctx context.Context, id string) { ip.Server.Events().Publish(DaemonMessageEvent, "Starting installation process, this could take a few minutes...") - err = wait.PollInfinite(time.Second, func() (bool, error) { - pod, err := ip.client.CoreV1().Pods(config.Get().Cluster.Namespace).Get(context.TODO(), ip.Server.ID()+"-installer", metav1.GetOptions{}) + err = wait.PollUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) { + pod, err := ip.client.CoreV1().Pods(config.Get().Cluster.Namespace).Get(ctx, ip.Server.ID()+"-installer", metav1.GetOptions{}) if err != nil { return false, err } - switch pod.Status.Phase { - case corev1.PodRunning: - return true, nil - case corev1.PodFailed, corev1.PodSucceeded: - return false, nil + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.Name != "installer" { + continue + } + switch { + case containerStatus.State.Running != nil: + return true, nil + case containerStatus.State.Waiting != nil && containerStatus.State.Waiting.Reason != "ContainerCreating": + return false, errors2.New(containerStatus.State.Waiting.Reason) + } + } return false, nil }) if err != nil { ip.Server.Log().WithField("error", err).Warn("pod never entered running phase") + return } if err := ip.StreamOutput(ctx, id); err != nil { ip.Server.Log().WithField("error", err).Warn("error connecting to server install stream output") + return } - }(string(r.UID)) + }(ctx, string(r.UID)) - err = wait.PollInfinite(time.Second, func() (bool, error) { - pod, err := ip.client.CoreV1().Pods(config.Get().Cluster.Namespace).Get(context.TODO(), ip.Server.ID()+"-installer", metav1.GetOptions{}) + err = wait.PollUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) { + pod, err := ip.client.CoreV1().Pods(config.Get().Cluster.Namespace).Get(ctx, ip.Server.ID()+"-installer", metav1.GetOptions{}) if err != nil { return false, err } switch pod.Status.Phase { + case corev1.PodRunning: + return false, nil + case corev1.PodPending: + return false, nil case corev1.PodSucceeded: return true, nil case corev1.PodFailed: - return false, nil + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.Name == "installer" && containerStatus.State.Terminated != nil { + return false, errors2.New(containerStatus.State.Terminated.Message) + } + } + return true, nil + default: + return false, errors2.New("unknown pod status") } - return false, nil }) // Once the container has stopped running we can mark the install process as being completed. if err == nil { diff --git a/server/listeners.go b/server/listeners.go index e00753c8..6f982a6a 100644 --- a/server/listeners.go +++ b/server/listeners.go @@ -2,6 +2,7 @@ package server import ( "bytes" + "context" "regexp" "strconv" "sync" @@ -89,12 +90,12 @@ func (s *Server) StartEventListeners() { s.Environment.Events().On(c) s.Environment.SetLogCallback(s.processConsoleOutputEvent) - go func() { - if err := s.Environment.WatchPodEvents(s.Context()); err != nil { + go func(ctx context.Context) { + if err := s.Environment.WatchPodEvents(ctx); err != nil { s.Log().WithField("error", err).Warn("failed to watch pod events") return } - }() + }(s.Context()) go func() { for { diff --git a/server/manager.go b/server/manager.go index cbd7ec68..a0645108 100644 --- a/server/manager.go +++ b/server/manager.go @@ -251,9 +251,7 @@ func (m *Manager) InitServer(data remote.ServerConfigurationResponse) (*Server, } } - if err := s.fs.SetManager(fmt.Sprintf("%s:%v", ip, port)); err != nil { - return nil, err - } + s.fs.SetManager(fmt.Sprintf("%s:%v", ip, port)) } } diff --git a/server/power.go b/server/power.go index d409ff40..bbf14b2b 100644 --- a/server/power.go +++ b/server/power.go @@ -156,9 +156,7 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error } } - if err := s.fs.SetManager(fmt.Sprintf("%s:%v", ip, port)); err != nil { - return err - } + s.fs.SetManager(fmt.Sprintf("%s:%v", ip, port)) } } @@ -180,9 +178,9 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error return err } - if err := s.Environment.CreateSFTP(s.Context()); err != nil { - return err - } + // if err := s.Environment.CreateSFTP(s.Context()); err != nil { + // return err + // } if action == PowerActionStop { return nil @@ -198,7 +196,11 @@ func (s *Server) HandlePowerAction(action PowerAction, waitSeconds ...int) error if err := s.Environment.Terminate(s.Context()); err != nil { return err } - return s.Environment.CreateSFTP(s.Context()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + return s.Environment.CreateSFTP(ctx, cancel) // return s.Environment.Terminate(s.Context()) } diff --git a/server/server.go b/server/server.go index 3707cadb..48679e09 100644 --- a/server/server.go +++ b/server/server.go @@ -232,7 +232,7 @@ func (s *Server) SyncWithConfiguration(cfg remote.ServerConfigurationResponse) e } // Reads the log file for a server up to a specified number of bytes. -func (s *Server) ReadLogfile(len int) ([]string, error) { +func (s *Server) ReadLogfile(len int64) ([]string, error) { return s.Environment.Readlog(len) } @@ -298,7 +298,11 @@ func (s *Server) OnStateChange() { if err := server.handleServerCrash(); err != nil { if IsTooFrequentCrashError(err) { server.Log().Info("did not restart server after crash; occurred too soon after the last") - if err := s.Environment.CreateSFTP(context.Background()); err != nil { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := s.Environment.CreateSFTP(ctx, cancel); err != nil { server.Log().WithField("error", err).Error("failed to start server SFTP") } } else { diff --git a/server/snapshot.go b/server/snapshot.go index 81cedbb2..0dacba19 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -141,7 +141,7 @@ func (s *Server) RestoreBackup(b snapshot.BackupInterface, reader io.ReadCloser) // in the file one at a time and writing them to the disk. s.Log().Debug("starting process for snapshot restoration") - storageClass := config.Get().Cluster.Namespace + storageClass := config.Get().Cluster.StorageClass // Override storage class if len(s.cfg.StorageClass) > 0 { storageClass = s.cfg.StorageClass diff --git a/server/snapshot/local.go b/server/snapshot/local.go index 46e449cb..5a714f68 100644 --- a/server/snapshot/local.go +++ b/server/snapshot/local.go @@ -25,7 +25,7 @@ type LocalBackup struct { var _ BackupInterface = (*LocalBackup)(nil) -func NewLocal(snapshotClient *snapshotclientset.Clientset, client remote.Client, clientset *kubernetes.Clientset, uuid string, ignore string) *LocalBackup { +func NewLocal(snapshotClient *snapshotclientset.Clientset, client remote.Client, clientset *kubernetes.Clientset, uuid string) *LocalBackup { return &LocalBackup{ Backup{ snapshotClient: snapshotClient, @@ -39,7 +39,7 @@ func NewLocal(snapshotClient *snapshotclientset.Clientset, client remote.Client, // LocateLocal finds the backup for a server and returns the local path. This // will obviously only work if the backup was created as a local backup. func LocateLocal(snapshotClient *snapshotclientset.Clientset, client remote.Client, clientset *kubernetes.Clientset, uuid string) (*LocalBackup, error) { - b := NewLocal(snapshotClient, client, clientset, uuid, "") + b := NewLocal(snapshotClient, client, clientset, uuid) return b, nil } @@ -125,7 +125,7 @@ func (b *LocalBackup) Generate(ctx context.Context, sid, ignore string) (*Archiv func (b *LocalBackup) Restore(ctx context.Context, sId string, disk int64, storageClass string, callback RestoreCallback) error { cfg := config.Get().Cluster - snapshot, err := b.snapshotClient.SnapshotV1().VolumeSnapshots(cfg.Namespace).Get(context.Background(), b.Identifier(), metav1.GetOptions{}) + snapshot, err := b.snapshotClient.SnapshotV1().VolumeSnapshots(cfg.Namespace).Get(ctx, b.Identifier(), metav1.GetOptions{}) if err != nil { return err } @@ -153,42 +153,53 @@ func (b *LocalBackup) Restore(ctx context.Context, sId string, disk int64, stora } if snapshot.Status != nil { - var zero int64 = 0 + var seconds int64 = 30 policy := metav1.DeletePropagationForeground - err = b.clientset.CoreV1().Pods(cfg.Namespace).Delete(context.Background(), sId, metav1.DeleteOptions{ - GracePeriodSeconds: &zero, + err = b.clientset.CoreV1().Pods(cfg.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{ + GracePeriodSeconds: &seconds, PropagationPolicy: &policy, + }, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("Service=Kubectyl,uuid=%s", sId), }) if err != nil && !errors.IsNotFound(err) { return err } - err = wait.PollImmediate(time.Second, 5*time.Minute, func() (bool, error) { - _, err := b.clientset.CoreV1().Pods(cfg.Namespace).Get(context.Background(), sId, metav1.GetOptions{}) - if err != nil && !errors.IsNotFound(err) { + if err := wait.PollUntilWithContext(ctx, 3*time.Second, func(ctx context.Context) (bool, error) { + pods, err := b.clientset.CoreV1().Pods(cfg.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("Service=Kubectyl,uuid=%s", sId), + }) + if err != nil { return false, err } - return true, nil - }) - if err != nil { + + if len(pods.Items) == 0 { + return true, nil + } + + return false, nil + }); err != nil { return err } - err = b.clientset.CoreV1().PersistentVolumeClaims(cfg.Namespace).Delete(context.Background(), sId+"-pvc", metav1.DeleteOptions{ - GracePeriodSeconds: &zero, + err = b.clientset.CoreV1().PersistentVolumeClaims(cfg.Namespace).Delete(ctx, pvc.Name, metav1.DeleteOptions{ + GracePeriodSeconds: &seconds, PropagationPolicy: &policy, }) if err != nil && !errors.IsNotFound(err) { return err } - err = wait.PollImmediate(time.Second, 5*time.Minute, func() (bool, error) { - _, err := b.clientset.CoreV1().PersistentVolumeClaims(cfg.Namespace).Get(context.Background(), sId+"-pvc", metav1.GetOptions{}) + err = wait.PollUntilWithContext(ctx, 3*time.Second, func(ctx context.Context) (bool, error) { + _, err := b.clientset.CoreV1().PersistentVolumeClaims(cfg.Namespace).Get(ctx, pvc.Name, metav1.GetOptions{}) if err != nil && !errors.IsNotFound(err) { return false, err } - return true, nil + if errors.IsNotFound(err) { + return true, nil + } + return false, nil }) if err != nil { return err @@ -213,14 +224,14 @@ func (b *LocalBackup) Restore(ctx context.Context, sId string, disk int64, stora }, } - _, err = b.clientset.CoreV1().PersistentVolumeClaims(cfg.Namespace).Create(context.Background(), restoredPVC, metav1.CreateOptions{}) + _, err = b.clientset.CoreV1().PersistentVolumeClaims(cfg.Namespace).Create(ctx, restoredPVC, metav1.CreateOptions{}) if err != nil { return err } // Wait for the PVC to become available. - err = wait.PollImmediate(time.Second, 5*time.Minute, func() (bool, error) { - pvc, err := b.clientset.CoreV1().PersistentVolumeClaims(cfg.Namespace).Get(context.Background(), restoredPVC.Name, metav1.GetOptions{}) + err = wait.PollUntilWithContext(ctx, 3*time.Second, func(ctx context.Context) (bool, error) { + pvc, err := b.clientset.CoreV1().PersistentVolumeClaims(cfg.Namespace).Get(ctx, restoredPVC.Name, metav1.GetOptions{}) if err != nil { return false, err } diff --git a/system/system.go b/system/system.go index 0b33f699..cb9a75a5 100644 --- a/system/system.go +++ b/system/system.go @@ -1,32 +1,60 @@ package system import ( + "context" + "fmt" "runtime" "github.com/acobaugh/osrelease" "github.com/docker/docker/pkg/parsers/kernel" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/version" + "k8s.io/client-go/kubernetes" ) type Information struct { - Version string `json:"version"` - System System `json:"system"` + Version string `json:"version"` + Kubernetes KubernetesInfo `json:"kubernetes"` + System System `json:"system"` +} + +type KubernetesInfo struct { + Nodes []NodeInfo `json:"nodes"` + PodStatus map[string]string `json:"pod_status"` + Version version.Info `json:"version"` +} + +type NodeInfo struct { + Name string `json:"name"` + PodsNum int64 `json:"pods_num"` +} + +type PodInfo struct { + Name string `json:"name"` + Status corev1.PodPhase `json:"status"` } type System struct { Architecture string `json:"architecture"` CPUThreads int `json:"cpu_threads"` - MemoryBytes int64 `json:"memory_bytes"` + MemoryBytes uint64 `json:"memory_bytes"` KernelVersion string `json:"kernel_version"` OS string `json:"os"` OSType string `json:"os_type"` } -func GetSystemInformation() (*Information, error) { +func GetSystemInformation(clientset *kubernetes.Clientset, namespace string) (*Information, error) { k, err := kernel.GetKernelVersion() if err != nil { return nil, err } + info, err := getKubernetesInfo(context.Background(), clientset, namespace) + if err != nil { + return nil, err + } + release, err := osrelease.Read() if err != nil { return nil, err @@ -39,14 +67,83 @@ func GetSystemInformation() (*Information, error) { os = release["NAME"] } + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + + systemMemory := memStats.Sys + return &Information{ Version: Version, + Kubernetes: KubernetesInfo{ + Nodes: info.Nodes, + PodStatus: info.PodStatus, + Version: info.Version, + }, System: System{ Architecture: runtime.GOARCH, CPUThreads: runtime.NumCPU(), + MemoryBytes: systemMemory, KernelVersion: k.String(), OS: os, OSType: runtime.GOOS, }, }, nil } + +func getKubernetesInfo(ctx context.Context, clientset *kubernetes.Clientset, namespace string) (*KubernetesInfo, error) { + nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("error getting nodes: %v", err) + } + + // TODO: Change method to get namespace string + pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("error getting pods: %v", err) + } + + info, err := clientset.Discovery().ServerVersion() + if err != nil { + panic(err.Error()) + } + + kubeInfo := &KubernetesInfo{ + Nodes: getNodeInfo(nodes), + PodStatus: getPodStatuses(pods), + Version: version.Info{ + Major: info.Major, + Minor: info.Minor, + GitVersion: info.GitVersion, + GitCommit: info.GitCommit, + GitTreeState: info.GitTreeState, + BuildDate: info.BuildDate, + GoVersion: info.GoVersion, + Compiler: info.Compiler, + Platform: info.Platform, + }, + } + + return kubeInfo, nil +} + +func getNodeInfo(nodes *corev1.NodeList) []NodeInfo { + nodeInfoList := make([]NodeInfo, 0) + for _, node := range nodes.Items { + podsNum, _ := node.Status.Capacity.Pods().AsInt64() + + nodeInfo := NodeInfo{ + Name: node.Name, + PodsNum: podsNum, + } + nodeInfoList = append(nodeInfoList, nodeInfo) + } + return nodeInfoList +} + +func getPodStatuses(pods *corev1.PodList) map[string]string { + podStatuses := make(map[string]string) + for _, pod := range pods.Items { + podStatuses[pod.Name] = string(pod.Status.Phase) + } + return podStatuses +} From 9bde87cd30cb19cd3812325e550e11f40af53d11 Mon Sep 17 00:00:00 2001 From: Andrei-Lucian Mociu Date: Mon, 12 Jun 2023 06:04:57 +0000 Subject: [PATCH 2/2] Fix possible server crash if metrics clientset fail --- environment/kubernetes/pod.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/environment/kubernetes/pod.go b/environment/kubernetes/pod.go index d0147dd9..fdedc70d 100644 --- a/environment/kubernetes/pod.go +++ b/environment/kubernetes/pod.go @@ -83,10 +83,11 @@ func (e *Environment) Attach(ctx context.Context) error { // function is to avoid a hang situation when trying to attach to a container. pollCtx, cancel := context.WithCancel(context.Background()) defer cancel() - defer func() { - e.SetState(environment.ProcessOfflineState) - e.SetStream(nil) - }() + // TODO: If metrics fail pod will be restarted / stopped. + // defer func() { + // e.SetState(environment.ProcessOfflineState) + // e.SetStream(nil) + // }() go func() { if err := e.pollResources(pollCtx); err != nil {