Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement replica exec #130

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions api/util/release.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package util

import (
"sort"

apps "github.com/ninech/apis/apps/v1alpha1"
)

// OrderReleaseList orders the given list of releases, moving the latest
// release to the beginning of the list
func OrderReleaseList(releaseList *apps.ReleaseList) {
if len(releaseList.Items) <= 1 {
return
}

sort.Slice(releaseList.Items, func(i, j int) bool {
applicationNameI := releaseList.Items[i].ObjectMeta.Labels[ApplicationNameLabel]
applicationNameJ := releaseList.Items[j].ObjectMeta.Labels[ApplicationNameLabel]

if applicationNameI != applicationNameJ {
return applicationNameI < applicationNameJ
}

return releaseList.Items[i].CreationTimestampNano < releaseList.Items[j].CreationTimestampNano
})
}
254 changes: 254 additions & 0 deletions exec/application.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package exec

import (
"context"
"fmt"
"io"

b64 "encoding/base64"

dockerterm "github.com/moby/term"
apps "github.com/ninech/apis/apps/v1alpha1"
infrastructure "github.com/ninech/apis/infrastructure/v1alpha1"
meta "github.com/ninech/apis/meta/v1alpha1"
"github.com/ninech/nctl/api"
"github.com/ninech/nctl/api/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubectl/pkg/util/term"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
)

const (
appBuildTypeBuildpack appBuildType = "buildpack"
appBuildTypeDockerfile appBuildType = "dockerfile"
buildpackEntrypoint = "launcher"
defaultShellBuildpack = "/bin/bash"
defaultShellDockerfile = "/bin/sh"
)

// appBuildType describes the way how the app was build (buildpack/dockerfile)
type appBuildType string

type remoteCommandParameters struct {
replicaName string
replicaNamespace string
command []string
tty bool
enableStdin bool
stdin io.Reader
stdout io.Writer
stderr io.Writer
restConfig *rest.Config
}

type applicationCmd struct {
resourceCmd
Stdin bool `name:"stdin" short:"i" help:"Pass stdin to the application" default:"true"`
Tty bool `name:"tty" short:"t" help:"Stdin is a TTY" default:"true"`
Command []string `arg:"" help:"command to execute" optional:""`
}

// Help displays examples for the application exec command
func (ac applicationCmd) Help() string {
return `Examples:
# Open a shell in a buildpack/dockerfile built application. The dockerfile
# built application needs a valid "/bin/sh" shell to be installed.
nctl exec app myapp

# Get output from running the 'date' command in an application replica.
nctl exec app myapp -- date

# Use redirection to execute a comand.
echo date | nctl exec app myapp

# In certain situations it might be needed to not redirect stdin. This can be
# achieved by using the "stdin" flag:
nctl exec app --stdin=false myapp -- <command>
`
}

func (cmd *applicationCmd) Run(ctx context.Context, client *api.Client, exec *Cmd) error {
replicaName, buildType, err := cmd.getReplica(ctx, client)
if err != nil {
return fmt.Errorf("error when searching for replica to connect: %w", err)
}
config, err := deploioRestConfig(ctx, client)
if err != nil {
return fmt.Errorf("can not create deplo.io cluster rest config: %w", err)
}
// use dockerterm to gather the std io streams (windows supported)
stdin, stdout, stderr := dockerterm.StdStreams()
return executeRemoteCommand(
ctx,
remoteCommandParameters{
replicaName: replicaName,
replicaNamespace: client.Project,
command: replicaCommand(buildType, cmd.Command),
tty: cmd.Tty,
enableStdin: cmd.Stdin,
stdin: stdin,
stdout: stdout,
stderr: stderr,
restConfig: config,
})
}

func latestAvailableRelease(releases *apps.ReleaseList) *apps.Release {
util.OrderReleaseList(releases)
for _, release := range releases.Items {
if release.Status.AtProvider.ReleaseStatus == apps.ReleaseProcessStatusAvailable {
return &release
}
}
return nil
}

// getReplica finds a replica of the latest available release
func (cmd *applicationCmd) getReplica(ctx context.Context, client *api.Client) (string, appBuildType, error) {
releases := &apps.ReleaseList{}
if err := client.List(
ctx,
releases,
runtimeclient.InNamespace(client.Project),
runtimeclient.MatchingLabels{util.ApplicationNameLabel: cmd.Name},
); err != nil {
return "", "", err
}

if len(releases.Items) == 0 {
return "", "", fmt.Errorf("no releases found for application %s", cmd.Name)
}
latestAvailableRelease := latestAvailableRelease(releases)
if latestAvailableRelease == nil {
return "", "", fmt.Errorf("no ready release found for application %s", cmd.Name)
}
buildType := appBuildTypeBuildpack
if latestAvailableRelease.Spec.ForProvider.DockerfileBuild {
buildType = appBuildTypeDockerfile
}
if len(latestAvailableRelease.Status.AtProvider.ReplicaObservation) == 0 {
return "", buildType, fmt.Errorf("no replica information found for release %s", latestAvailableRelease.Name)
}
if replica := readyReplica(latestAvailableRelease.Status.AtProvider.ReplicaObservation); replica != "" {
return replica, buildType, nil
}
return "", buildType, fmt.Errorf("no ready replica found for release %s", latestAvailableRelease.Name)
}

func readyReplica(replicaObs []apps.ReplicaObservation) string {
for _, obs := range replicaObs {
if obs.Status == apps.ReplicaStatusReady {
return obs.ReplicaName
}
}
return ""
}

// setupTTY sets up a TTY for command execution
func setupTTY(params *remoteCommandParameters) term.TTY {
t := term.TTY{
Out: params.stdout,
}
if !params.enableStdin {
return t
}
t.In = params.stdin
if !params.tty {
return t
}
if !t.IsTerminalIn() {
// if this is not a suitable TTY, we don't request one in the
// exec call and don't set the terminal into RAW mode either
params.tty = false
return t
}
// if we get to here, the user wants to attach stdin, wants a TTY, and
// os.Stdin is a terminal, so we can safely set t.Raw to true
t.Raw = true
return t
}

func executeRemoteCommand(ctx context.Context, params remoteCommandParameters) error {
coreClient, err := kubernetes.NewForConfig(params.restConfig)
if err != nil {
return err
}

tty := setupTTY(&params)
var sizeQueue remotecommand.TerminalSizeQueue
if tty.Raw {
// this call spawns a goroutine to monitor/update the terminal size
sizeQueue = tty.MonitorSize(tty.GetSize())

// unset stderr if it was previously set because both stdout
// and stderr go over params.stdout when tty is
// true
params.stderr = nil
}
fn := func() error {
request := coreClient.CoreV1().RESTClient().
Post().
Namespace(params.replicaNamespace).
Resource("pods").
Name(params.replicaName).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: params.command,
Stdin: params.enableStdin,
Stdout: params.stdout != nil,
Stderr: params.stderr != nil,
TTY: params.tty,
}, scheme.ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(params.restConfig, "POST", request.URL())
if err != nil {
return err
}
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: tty.In,
Stdout: params.stdout,
Stderr: params.stderr,
Tty: params.tty,
TerminalSizeQueue: sizeQueue,
})

}
return tty.Safe(fn)
}

func deploioRestConfig(ctx context.Context, client *api.Client) (*rest.Config, error) {
config := rest.CopyConfig(client.Config)
deploioClusterData := &infrastructure.ClusterData{}
if err := client.Get(ctx, types.NamespacedName{Name: meta.ClusterDataDeploioName}, deploioClusterData); err != nil {
return nil, fmt.Errorf("can not gather deplo.io cluster connection details: %w", err)
}
config.Host = deploioClusterData.Status.AtProvider.APIEndpoint
var err error
if config.CAData, err = b64.StdEncoding.DecodeString(deploioClusterData.Status.AtProvider.APICACert); err != nil {
return nil, fmt.Errorf("can not decode deplo.io cluster CA certificate: %w", err)
}
return config, nil
}

func replicaCommand(buildType appBuildType, command []string) []string {
switch buildType {
case appBuildTypeBuildpack:
execute := append([]string{buildpackEntrypoint}, command...)
if len(command) == 0 {
execute = []string{buildpackEntrypoint, defaultShellBuildpack}
}
return execute
case appBuildTypeDockerfile:
if len(command) == 0 {
return []string{defaultShellDockerfile}
}
return command
default:
return command
}
}
Loading