Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor command/container/run.go #5693

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
181 changes: 97 additions & 84 deletions cli/command/container/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"os"
"strings"
"syscall"

Expand Down Expand Up @@ -116,12 +117,8 @@ func runRun(ctx context.Context, dockerCli command.Cli, flags *pflag.FlagSet, ro
return runContainer(ctx, dockerCli, ropts, copts, containerCfg)
}

//nolint:gocyclo
func runContainer(ctx context.Context, dockerCli command.Cli, runOpts *runOptions, copts *containerOptions, containerCfg *containerConfig) error {
config := containerCfg.Config
stdout, stderr := dockerCli.Out(), dockerCli.Err()
apiClient := dockerCli.Client()

config.ArgsEscaped = false

if !runOpts.detach {
Expand All @@ -143,123 +140,141 @@ func runContainer(ctx context.Context, dockerCli command.Cli, runOpts *runOption
if err != nil {
return toStatusError(err)
}
if runOpts.sigProxy {
sigc := notifyAllSignals()
// since we're explicitly setting up signal handling here, and the daemon will
// get notified independently of the clients ctx cancellation, we use this context
// but without cancellation to avoid ForwardAllSignals from returning
// before all signals are forwarded.
bgCtx := context.WithoutCancel(ctx)
go ForwardAllSignals(bgCtx, apiClient, containerID, sigc)
defer signal.StopCatch(sigc)
}

ctx, cancelFun := context.WithCancel(context.WithoutCancel(ctx))
defer cancelFun()
apiClient := dockerCli.Client()

var (
waitDisplayID chan struct{}
errCh chan error
)
// New context here because we don't to cancel waiting on container exit/remove
// when we cancel attach, etc.
statusCtx, cancelStatusCtx := context.WithCancel(context.WithoutCancel(ctx))
defer cancelStatusCtx()
statusChan := waitExitOrRemoved(statusCtx, apiClient, containerID, copts.autoRemove)

var waitDisplayID chan struct{}
attach := config.AttachStdin || config.AttachStdout || config.AttachStderr
if !attach {
// Make this asynchronous to allow the client to write to stdin before having to read the ID
waitDisplayID = make(chan struct{})
go func() {
defer close(waitDisplayID)
_, _ = fmt.Fprintln(stdout, containerID)
_, _ = fmt.Fprintln(dockerCli.Out(), containerID)
}()
}
if attach {
detachKeys := dockerCli.ConfigFile().DetachKeys
if runOpts.detachKeys != "" {
detachKeys = runOpts.detachKeys
}

// ctx should not be cancellable here, as this would kill the stream to the container
// and we want to keep the stream open until the process in the container exits or until
// the user forcefully terminates the CLI.
closeFn, err := attachContainer(ctx, dockerCli, containerID, &errCh, config, container.AttachOptions{
Stream: true,
Stdin: config.AttachStdin,
Stdout: config.AttachStdout,
Stderr: config.AttachStderr,
DetachKeys: detachKeys,
})
var attachWait func(<-chan int, error) error
if attach {
attachWait, err = setupContainerAttach(ctx, dockerCli, containerID, runOpts, config)
if err != nil {
return err
}
defer closeFn()
}

// New context here because we don't to cancel waiting on container exit/remove
// when we cancel attach, etc.
statusCtx, cancelStatusCtx := context.WithCancel(context.WithoutCancel(ctx))
defer cancelStatusCtx()
statusChan := waitExitOrRemoved(statusCtx, apiClient, containerID, copts.autoRemove)

// start the container
if err := apiClient.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil {
// If we have hijackedIOStreamer, we should notify
// hijackedIOStreamer we are going to exit and wait
// to avoid the terminal are not restored.
err = apiClient.ContainerStart(ctx, containerID, container.StartOptions{})
if err != nil {
if attach {
cancelFun()
<-errCh
attachWait(statusChan, err)
}

if copts.autoRemove {
// wait container to be removed
<-statusChan
}
return toStatusError(err)
}

if attach {
return attachWait(statusChan, nil)
}

// Detached mode: wait for the id to be displayed and return.
if !attach {
// Detached mode
<-waitDisplayID
return nil
<-waitDisplayID
return nil
}

func setupContainerAttach(ctx context.Context, dockerCli command.Cli, containerID string, runOpts *runOptions, config *container.Config) (func(<-chan int, error) error, error) {
detachKeys := dockerCli.ConfigFile().DetachKeys
if runOpts.detachKeys != "" {
detachKeys = runOpts.detachKeys
}

if config.Tty && dockerCli.Out().IsTerminal() {
if err := MonitorTtySize(ctx, dockerCli, containerID, false); err != nil {
_, _ = fmt.Fprintln(stderr, "Error monitoring TTY size:", err)
}
// ctx should not be cancellable here, as this would kill the stream to the container
// and we want to keep the stream open until the process in the container exits or until
// the user forcefully terminates the CLI.
attachCtx, attachCancel := context.WithCancel(context.WithoutCancel(ctx))
errCh, closeFn, err := attachContainer(attachCtx, dockerCli, containerID, config, container.AttachOptions{
Stream: true,
Stdin: config.AttachStdin,
Stdout: config.AttachStdout,
Stderr: config.AttachStderr,
DetachKeys: detachKeys,
})
if err != nil {
attachCancel()
return nil, err
}

select {
case err := <-errCh:
if err != nil {
if _, ok := err.(term.EscapeError); ok {
// The user entered the detach escape sequence.
return nil
}
var sigc chan os.Signal
if runOpts.sigProxy {
sigc = notifyAllSignals()
// since we're explicitly setting up signal handling here, and the daemon will
// get notified independently of the clients ctx cancellation, we use this context
// but without cancellation to avoid ForwardAllSignals from returning
// before all signals are forwarded.
bgCtx := context.WithoutCancel(ctx)
go ForwardAllSignals(bgCtx, dockerCli.Client(), containerID, sigc)
}

logrus.Debugf("Error hijack: %s", err)
return err
return func(statusC <-chan int, err error) error {
defer closeFn()
if runOpts.sigProxy {
defer signal.StopCatch(sigc)
}
status := <-statusChan
if status != 0 {
return cli.StatusError{StatusCode: status}

// if the container failed to start, just cancel the streamer
// and wait for the terminal to be restored
if err != nil {
attachCancel()
<-errCh
return nil
}
case status := <-statusChan:
// notify hijackedIOStreamer that we're exiting and wait
// so that the terminal can be restored.
cancelFun()
<-errCh
if status != 0 {
return cli.StatusError{StatusCode: status}

if config.Tty && dockerCli.Out().IsTerminal() {
if err := MonitorTtySize(attachCtx, dockerCli, containerID, false); err != nil {
_, _ = fmt.Fprintln(dockerCli.Err(), "Error monitoring TTY size:", err)
}
}
}

return nil
select {
case err := <-errCh:
if err != nil {
if _, ok := err.(term.EscapeError); ok {
// The user entered the detach escape sequence.
return nil
}

logrus.Debugf("Error hijack: %s", err)
return err
}
status := <-statusC
if status != 0 {
return cli.StatusError{StatusCode: status}
}
case status := <-statusC:
// notify hijackedIOStreamer that we're exiting and wait
// so that the terminal can be restored.
attachCancel()
<-errCh
if status != 0 {
return cli.StatusError{StatusCode: status}
}
}
return nil
}, nil
}

func attachContainer(ctx context.Context, dockerCli command.Cli, containerID string, errCh *chan error, config *container.Config, options container.AttachOptions) (func(), error) {
func attachContainer(ctx context.Context, dockerCli command.Cli, containerID string, config *container.Config, options container.AttachOptions) (chan error, func(), error) {
resp, errAttach := dockerCli.Client().ContainerAttach(ctx, containerID, options)
if errAttach != nil {
return nil, errAttach
return nil, nil, errAttach
}

var (
Expand All @@ -281,8 +296,6 @@ func attachContainer(ctx context.Context, dockerCli command.Cli, containerID str
}

ch := make(chan error, 1)
*errCh = ch

go func() {
ch <- func() error {
streamer := hijackedIOStreamer{
Expand All @@ -301,7 +314,7 @@ func attachContainer(ctx context.Context, dockerCli command.Cli, containerID str
return errAttach
}()
}()
return resp.Close, nil
return ch, resp.Close, nil
}

// withHelp decorates the error with a suggestion to use "--help".
Expand Down
29 changes: 24 additions & 5 deletions cli/command/container/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func TestRunAttachTermination(t *testing.T) {
var conn net.Conn
killCh := make(chan struct{})
attachCh := make(chan struct{})
startCh := make(chan struct{})
containerExitC := make(chan struct{})
fakeCLI := test.NewFakeCli(&fakeClient{
createContainerFunc: func(_ *container.Config, _ *container.HostConfig, _ *network.NetworkingConfig, _ *specs.Platform, _ string) (container.CreateResponse, error) {
return container.CreateResponse{
Expand All @@ -158,6 +160,7 @@ func TestRunAttachTermination(t *testing.T) {
},
containerKillFunc: func(ctx context.Context, containerID, signal string) error {
killCh <- struct{}{}
containerExitC <- struct{}{}
return nil
},
containerAttachFunc: func(ctx context.Context, containerID string, options container.AttachOptions) (types.HijackedResponse, error) {
Expand All @@ -173,11 +176,19 @@ func TestRunAttachTermination(t *testing.T) {
responseChan := make(chan container.WaitResponse, 1)
errChan := make(chan error)

responseChan <- container.WaitResponse{
StatusCode: 130,
}
go func() {
<-containerExitC
responseChan <- container.WaitResponse{
StatusCode: 130,
}
}()
return responseChan, errChan
},
containerStartFunc: func(containerID string, options container.StartOptions) error {
startCh <- struct{}{}
return nil
},

// use new (non-legacy) wait API
// see: 38591f20d07795aaef45d400df89ca12f29c603b
Version: "1.30",
Expand All @@ -201,16 +212,24 @@ func TestRunAttachTermination(t *testing.T) {
case <-attachCh:
}

// run command should attempt to start the container
select {
case <-time.After(5 * time.Second):
t.Fatal("containerStartCh was not called before the timeout")
case <-startCh:
}

assert.NilError(t, syscall.Kill(syscall.Getpid(), syscall.SIGINT))
// end stream from "container" so that we'll detach
conn.Close()

select {
case <-killCh:
case <-time.After(5 * time.Second):
t.Fatal("containerKillFunc was not called before the timeout")
}

// end stream from "container" so that we'll detach
conn.Close()

select {
case cmdErr := <-cmdErrC:
assert.Equal(t, cmdErr, cli.StatusError{
Expand Down
Loading