Skip to content

Commit

Permalink
implement replica exec
Browse files Browse the repository at this point in the history
This implements the execution of commands in deplo.io application replicas. If there are multiple replicas, the first replica will be used.
  • Loading branch information
thirdeyenick committed Jul 18, 2024
1 parent 1fd7644 commit 5bc8f04
Show file tree
Hide file tree
Showing 9 changed files with 648 additions and 26 deletions.
26 changes: 26 additions & 0 deletions api/util/release.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package util

import (
"sort"

apps "github.com/ninech/apis/apps/v1alpha1"
)

// OrderReleaseList orders the given list of releases, moving the latest
// release to the beginning of the list
func OrderReleaseList(releaseList *apps.ReleaseList) {
if len(releaseList.Items) <= 1 {
return
}

sort.Slice(releaseList.Items, func(i, j int) bool {
applicationNameI := releaseList.Items[i].ObjectMeta.Labels[ApplicationNameLabel]
applicationNameJ := releaseList.Items[j].ObjectMeta.Labels[ApplicationNameLabel]

if applicationNameI != applicationNameJ {
return applicationNameI < applicationNameJ
}

return releaseList.Items[i].CreationTimestampNano < releaseList.Items[j].CreationTimestampNano
})
}
235 changes: 235 additions & 0 deletions exec/application.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package exec

import (
"context"
"fmt"
"io"

b64 "encoding/base64"

dockerterm "github.com/moby/term"
apps "github.com/ninech/apis/apps/v1alpha1"
infrastructure "github.com/ninech/apis/infrastructure/v1alpha1"
meta "github.com/ninech/apis/meta/v1alpha1"
"github.com/ninech/nctl/api"
"github.com/ninech/nctl/api/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubectl/pkg/util/term"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
)

const (
appBuildTypeBuildpack appBuildType = "buildpack"
appBuildTypeDockerfile appBuildType = "dockerfile"
buildpackEntrypoint = "launcher"
defaultShellBuildpack = "/bin/bash"
defaultShellDockerfile = "/bin/sh"
)

// appBuildType describes the way how the app was build (buildpack/dockerfile)
type appBuildType string

type remoteCommandParameters struct {
replicaName string
replicaNamespace string
command []string
tty bool
enableStdin bool
stdin io.Reader
stdout io.Writer
stderr io.Writer
restConfig *rest.Config
}

type applicationCmd struct {
resourceCmd
Stdin bool `name:"stdin" short:"i" help:"Pass stdin to the application" default:"true"`
Tty bool `name:"tty" short:"t" help:"Stdin is a TTY" default:"true"`
Command []string `arg:"" help:"command to execute" optional:""`
}

func (cmd *applicationCmd) Run(ctx context.Context, client *api.Client, exec *Cmd) error {
replicaName, buildType, err := cmd.getReplica(ctx, client)
if err != nil {
return fmt.Errorf("error when searching for replica to connect: %w", err)
}
config, err := deploioRestConfig(ctx, client)
if err != nil {
return fmt.Errorf("can not create deplo.io cluster rest config: %w", err)
}
// use dockerterm to gather the std io streams (windows supported)
stdin, stdout, stderr := dockerterm.StdStreams()
return executeRemoteCommand(
ctx,
remoteCommandParameters{
replicaName: replicaName,
replicaNamespace: client.Project,
command: replicaCommand(buildType, cmd.Command),
tty: cmd.Tty,
enableStdin: cmd.Stdin,
stdin: stdin,
stdout: stdout,
stderr: stderr,
restConfig: config,
})
}

func latestAvailableRelease(releases *apps.ReleaseList) *apps.Release {
util.OrderReleaseList(releases)
for _, release := range releases.Items {
if release.Status.AtProvider.ReleaseStatus == apps.ReleaseProcessStatusAvailable {
return &release
}
}
return nil
}

// getReplica finds a replica of the latest available release
func (cmd *applicationCmd) getReplica(ctx context.Context, client *api.Client) (string, appBuildType, error) {
releases := &apps.ReleaseList{}
if err := client.List(
ctx,
releases,
runtimeclient.InNamespace(client.Project),
runtimeclient.MatchingLabels{util.ApplicationNameLabel: cmd.Name},
); err != nil {
return "", "", err
}

if len(releases.Items) == 0 {
return "", "", fmt.Errorf("no releases found for application %s", cmd.Name)
}
latestAvailableRelease := latestAvailableRelease(releases)
if latestAvailableRelease == nil {
return "", "", fmt.Errorf("no ready release found for application %s", cmd.Name)
}
buildType := appBuildTypeBuildpack
if latestAvailableRelease.Spec.ForProvider.DockerfileBuild {
buildType = appBuildTypeDockerfile
}
if len(latestAvailableRelease.Status.AtProvider.ReplicaObservation) == 0 {
return "", buildType, fmt.Errorf("no replica information found for release %s", latestAvailableRelease.Name)
}
if replica := readyReplica(latestAvailableRelease.Status.AtProvider.ReplicaObservation); replica != "" {
return replica, buildType, nil
}
return "", buildType, fmt.Errorf("no ready replica found for release %s", latestAvailableRelease.Name)
}

func readyReplica(replicaObs []apps.ReplicaObservation) string {
for _, obs := range replicaObs {
if obs.Status == apps.ReplicaStatusReady {
return obs.ReplicaName
}
}
return ""
}

// setupTTY sets up a TTY for command execution
func setupTTY(params *remoteCommandParameters) term.TTY {
t := term.TTY{
Out: params.stdout,
}
if !params.enableStdin {
return t
}
t.In = params.stdin
if !params.tty {
return t
}
if !t.IsTerminalIn() {
// if this is not a suitable TTY, we don't request one in the
// exec call and don't set the terminal into RAW mode either
params.tty = false
return t
}
// if we get to here, the user wants to attach stdin, wants a TTY, and
// os.Stdin is a terminal, so we can safely set t.Raw to true
t.Raw = true
return t
}

func executeRemoteCommand(ctx context.Context, params remoteCommandParameters) error {
coreClient, err := kubernetes.NewForConfig(params.restConfig)
if err != nil {
return err
}

tty := setupTTY(&params)
var sizeQueue remotecommand.TerminalSizeQueue
if tty.Raw {
// this call spawns a goroutine to monitor/update the terminal size
sizeQueue = tty.MonitorSize(tty.GetSize())

// unset stderr if it was previously set because both stdout
// and stderr go over params.stdout when tty is
// true
params.stderr = nil
}
fn := func() error {
request := coreClient.CoreV1().RESTClient().
Post().
Namespace(params.replicaNamespace).
Resource("pods").
Name(params.replicaName).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: params.command,
Stdin: params.enableStdin,
Stdout: params.stdout != nil,
Stderr: params.stderr != nil,
TTY: params.tty,
}, scheme.ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(params.restConfig, "POST", request.URL())
if err != nil {
return err
}
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: tty.In,
Stdout: params.stdout,
Stderr: params.stderr,
Tty: params.tty,
TerminalSizeQueue: sizeQueue,
})

}
return tty.Safe(fn)
}

func deploioRestConfig(ctx context.Context, client *api.Client) (*rest.Config, error) {
config := rest.CopyConfig(client.Config)
deploioClusterData := &infrastructure.ClusterData{}
if err := client.Get(ctx, types.NamespacedName{Name: meta.ClusterDataDeploioName}, deploioClusterData); err != nil {
return nil, fmt.Errorf("can not gather deplo.io cluster connection details: %w", err)
}
config.Host = deploioClusterData.Status.AtProvider.APIEndpoint
var err error
if config.CAData, err = b64.StdEncoding.DecodeString(deploioClusterData.Status.AtProvider.APICACert); err != nil {
return nil, fmt.Errorf("can not decode deplo.io cluster CA certificate: %w", err)
}
return config, nil
}

func replicaCommand(buildType appBuildType, command []string) []string {
switch buildType {
case appBuildTypeBuildpack:
execute := append([]string{buildpackEntrypoint}, command...)
if len(command) == 0 {
execute = []string{buildpackEntrypoint, defaultShellBuildpack}
}
return execute
case appBuildTypeDockerfile:
if len(command) == 0 {
return []string{defaultShellDockerfile}
}
return command
default:
return command
}
}
Loading

0 comments on commit 5bc8f04

Please sign in to comment.