From b1ae048bf15b09a66e9e9f3bca724deae159a7e9 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 13 Nov 2023 23:57:12 +0000 Subject: [PATCH] Add dial-stdio command This allows the buildx CLI to act a proxy to the configured instance. It allows external code to use buildx itself as a driver for connecting to buildkitd instances. Instance and node selection should follow the same semantics as as `buildx build`, including taking into account the `BUILDX_BUILDER` env var and the `--builder` global flag. Signed-off-by: Brian Goff --- build/dial.go | 62 +++++++++++++ commands/dial_stdio.go | 132 ++++++++++++++++++++++++++++ commands/root.go | 1 + docs/reference/buildx.md | 1 + docs/reference/buildx_dial-stdio.md | 16 ++++ driver/docker-container/driver.go | 11 ++- driver/docker/driver.go | 6 +- driver/driver.go | 1 + driver/kubernetes/driver.go | 7 +- driver/remote/driver.go | 63 +++++++++++-- tests/dialstdio.go | 124 ++++++++++++++++++++++++++ tests/integration_test.go | 1 + 12 files changed, 414 insertions(+), 11 deletions(-) create mode 100644 build/dial.go create mode 100644 commands/dial_stdio.go create mode 100644 docs/reference/buildx_dial-stdio.md create mode 100644 tests/dialstdio.go diff --git a/build/dial.go b/build/dial.go new file mode 100644 index 00000000000..78391ebbc1d --- /dev/null +++ b/build/dial.go @@ -0,0 +1,62 @@ +package build + +import ( + "context" + stderrors "errors" + "net" + + "github.com/containerd/containerd/platforms" + "github.com/docker/buildx/builder" + "github.com/docker/buildx/util/progress" + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" +) + +func Dial(ctx context.Context, nodes []builder.Node, pw progress.Writer, platform *v1.Platform) (net.Conn, error) { + nodes, err := filterAvailableNodes(nodes) + if err != nil { + return nil, err + } + + if len(nodes) == 0 { + return nil, errors.New("no nodes available") + } + + var pls []v1.Platform + if platform != nil { + pls = []v1.Platform{*platform} + } + + opts := map[string]Options{"default": {Platforms: pls}} + resolved, err := resolveDrivers(ctx, nodes, opts, pw) + if err != nil { + return nil, err + } + + var dialError error + for _, ls := range resolved { + for _, rn := range ls { + if platform != nil { + p := *platform + var found bool + for _, pp := range rn.platforms { + if platforms.Only(p).Match(pp) { + found = true + break + } + } + if !found { + continue + } + } + + conn, err := nodes[rn.driverIndex].Driver.Dial(ctx) + if err == nil { + return conn, nil + } + dialError = stderrors.Join(err) + } + } + + return nil, errors.Wrap(dialError, "no nodes available") +} diff --git a/commands/dial_stdio.go b/commands/dial_stdio.go new file mode 100644 index 00000000000..92582dd523f --- /dev/null +++ b/commands/dial_stdio.go @@ -0,0 +1,132 @@ +package commands + +import ( + "io" + "net" + "os" + + "github.com/containerd/containerd/platforms" + "github.com/docker/buildx/build" + "github.com/docker/buildx/builder" + "github.com/docker/buildx/util/progress" + "github.com/docker/cli/cli/command" + "github.com/moby/buildkit/util/appcontext" + "github.com/moby/buildkit/util/progress/progressui" + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" +) + +type stdioOptions struct { + builder string + platform string + progress string +} + +func runDialStdio(dockerCli command.Cli, opts stdioOptions) error { + ctx := appcontext.Context() + + contextPathHash, _ := os.Getwd() + b, err := builder.New(dockerCli, + builder.WithName(opts.builder), + builder.WithContextPathHash(contextPathHash), + ) + if err != nil { + return err + } + + if err = updateLastActivity(dockerCli, b.NodeGroup); err != nil { + return errors.Wrapf(err, "failed to update builder last activity time") + } + nodes, err := b.LoadNodes(ctx) + if err != nil { + return err + } + + printer, err := progress.NewPrinter(ctx, os.Stderr, progressui.DisplayMode(opts.progress), progress.WithPhase("dial-stdio"), progress.WithDesc("builder: "+b.Name, "builder:"+b.Name)) + if err != nil { + return err + } + + var p *v1.Platform + if opts.platform != "" { + pp, err := platforms.Parse(opts.platform) + if err != nil { + return errors.Wrapf(err, "invalid platform %q", opts.platform) + } + p = &pp + } + + defer printer.Wait() + + return progress.Wrap("Proxying to builder", printer.Write, func(sub progress.SubLogger) error { + var conn net.Conn + + err := sub.Wrap("Dialing builder", func() error { + conn, err = build.Dial(ctx, nodes, printer, p) + if err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + + defer conn.Close() + + go func() { + <-ctx.Done() + closeWrite(conn) + }() + + var eg errgroup.Group + + eg.Go(func() error { + _, err := io.Copy(conn, os.Stdin) + closeWrite(conn) + return err + }) + eg.Go(func() error { + _, err := io.Copy(os.Stdout, conn) + closeRead(conn) + return err + }) + return eg.Wait() + }) +} + +func closeRead(conn net.Conn) error { + if c, ok := conn.(interface{ CloseRead() error }); ok { + return c.CloseRead() + } + return conn.Close() +} + +func closeWrite(conn net.Conn) error { + if c, ok := conn.(interface{ CloseWrite() error }); ok { + return c.CloseWrite() + } + return conn.Close() +} + +func dialStdioCmd(dockerCli command.Cli, rootOpts *rootOptions) *cobra.Command { + opts := stdioOptions{} + + cmd := &cobra.Command{ + Use: "dial-stdio", + Short: "Dial stdio", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + opts.builder = rootOpts.builder + return runDialStdio(dockerCli, opts) + }, + } + + flags := cmd.Flags() + cmd.Flags() + flags.StringVar(&opts.platform, "platform", os.Getenv("DOCKER_DEFAULT_PLATFORM"), "Target platform: this is used for node selection") + flags.StringVar(&opts.progress, "progress", "quiet", "Set type of progress output (auto, plain, tty).") + return cmd +} diff --git a/commands/root.go b/commands/root.go index 1c862489331..cf351200184 100644 --- a/commands/root.go +++ b/commands/root.go @@ -83,6 +83,7 @@ func addCommands(cmd *cobra.Command, dockerCli command.Cli) { buildCmd(dockerCli, opts, nil), bakeCmd(dockerCli, opts), createCmd(dockerCli), + dialStdioCmd(dockerCli, opts), rmCmd(dockerCli, opts), lsCmd(dockerCli), useCmd(dockerCli, opts), diff --git a/docs/reference/buildx.md b/docs/reference/buildx.md index 1ca845ba65b..3fb029db297 100644 --- a/docs/reference/buildx.md +++ b/docs/reference/buildx.md @@ -15,6 +15,7 @@ Extended build capabilities with BuildKit | [`build`](buildx_build.md) | Start a build | | [`create`](buildx_create.md) | Create a new builder instance | | [`debug`](buildx_debug.md) | Start debugger (EXPERIMENTAL) | +| [`dial-stdio`](buildx_dial-stdio.md) | Dial stdio | | [`du`](buildx_du.md) | Disk usage | | [`imagetools`](buildx_imagetools.md) | Commands to work on images in registry | | [`inspect`](buildx_inspect.md) | Inspect current builder instance | diff --git a/docs/reference/buildx_dial-stdio.md b/docs/reference/buildx_dial-stdio.md new file mode 100644 index 00000000000..e6949c42dea --- /dev/null +++ b/docs/reference/buildx_dial-stdio.md @@ -0,0 +1,16 @@ +# docker buildx dial-stdio + + +Dial stdio + +### Options + +| Name | Type | Default | Description | +|:-------------|:---------|:--------|:-------------------------------------------------| +| `--builder` | `string` | | Override the configured builder instance | +| `--platform` | `string` | | Target platform: this is used for node selection | +| `--progress` | `string` | `quiet` | Set type of progress output (auto, plain, tty). | + + + + diff --git a/driver/docker-container/driver.go b/driver/docker-container/driver.go index 4998b0cc008..53350ceae88 100644 --- a/driver/docker-container/driver.go +++ b/driver/docker-container/driver.go @@ -384,13 +384,20 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { _, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"}) if err != nil { return nil, err } - conn = demuxConn(conn) + return conn, nil +} + +func (d *Driver) Client(ctx context.Context) (*client.Client, error) { + conn, err := d.Dial(ctx) + if err != nil { + return nil, err + } exp, _, err := detect.Exporter() if err != nil { diff --git a/driver/docker/driver.go b/driver/docker/driver.go index 2c8b1637a06..85f4248cd4d 100644 --- a/driver/docker/driver.go +++ b/driver/docker/driver.go @@ -57,10 +57,14 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { + return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta) +} + func (d *Driver) Client(ctx context.Context) (*client.Client, error) { opts := []client.ClientOpt{ client.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta) + return d.Dial(ctx) }), client.WithSessionDialer(func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) { return d.DockerAPI.DialHijack(ctx, "/session", proto, meta) }), diff --git a/driver/driver.go b/driver/driver.go index 16d43d7af3a..6d3c546737c 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -59,6 +59,7 @@ type Driver interface { Version(context.Context) (string, error) Stop(ctx context.Context, force bool) error Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error + Dial(ctx context.Context) (net.Conn, error) Client(ctx context.Context) (*client.Client, error) Features(ctx context.Context) map[Feature]bool HostGatewayIP(ctx context.Context) (net.IP, error) diff --git a/driver/kubernetes/driver.go b/driver/kubernetes/driver.go index 9ce1dd3eaa5..29ffb21e08b 100644 --- a/driver/kubernetes/driver.go +++ b/driver/kubernetes/driver.go @@ -189,7 +189,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { restClient := d.clientset.CoreV1().RESTClient() restClientConfig, err := d.KubeClientConfig.ClientConfig() if err != nil { @@ -208,7 +208,10 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { if err != nil { return nil, err } + return conn, nil +} +func (d *Driver) Client(ctx context.Context) (*client.Client, error) { exp, _, err := detect.Exporter() if err != nil { return nil, err @@ -216,7 +219,7 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { var opts []client.ClientOpt opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return conn, nil + return d.Dial(ctx) })) if td, ok := exp.(client.TracerDelegate); ok { opts = append(opts, client.WithTracerDelegate(td)) diff --git a/driver/remote/driver.go b/driver/remote/driver.go index e4e2b05715a..2f2ab58928f 100644 --- a/driver/remote/driver.go +++ b/driver/remote/driver.go @@ -2,14 +2,18 @@ package remote import ( "context" - "errors" + "crypto/tls" + "crypto/x509" "net" + "os" + "strings" "time" "github.com/docker/buildx/driver" "github.com/docker/buildx/util/progress" "github.com/moby/buildkit/client" "github.com/moby/buildkit/util/tracing/detect" + "github.com/pkg/errors" ) type Driver struct { @@ -82,14 +86,61 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { opts = append(opts, client.WithTracerDelegate(td)) } + opts = append(opts, client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + return d.Dial(ctx) + })) + + return client.New(ctx, "", opts...) +} + +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { + network, addr, ok := strings.Cut(d.InitConfig.EndpointAddr, "://") + if !ok { + return nil, errors.Errorf("invalid endpoint address: %s", d.InitConfig.EndpointAddr) + } + + dialer := &net.Dialer{} + + conn, err := dialer.DialContext(ctx, network, addr) + if err != nil { + return nil, errors.WithStack(err) + } + if d.tlsOpts != nil { - opts = append(opts, []client.ClientOpt{ - client.WithServerConfig(d.tlsOpts.serverName, d.tlsOpts.caCert), - client.WithCredentials(d.tlsOpts.cert, d.tlsOpts.key), - }...) + cfg, err := loadTLS(d.tlsOpts) + if err != nil { + return nil, errors.Wrap(err, "error loading tls config") + } + conn = tls.Client(conn, cfg) + } + return conn, nil +} + +func loadTLS(opts *tlsOpts) (*tls.Config, error) { + cfg := &tls.Config{ + ServerName: opts.serverName, + RootCAs: x509.NewCertPool(), + } + + if opts.caCert != "" { + ca, err := os.ReadFile(opts.caCert) + if err != nil { + return nil, errors.Wrap(err, "could not read ca certificate") + } + if ok := cfg.RootCAs.AppendCertsFromPEM(ca); !ok { + return nil, errors.New("failed to append ca certs") + } + } + + if opts.cert != "" || opts.key != "" { + cert, err := tls.LoadX509KeyPair(opts.cert, opts.key) + if err != nil { + return nil, errors.Wrap(err, "could not read certificate/key") + } + cfg.Certificates = append(cfg.Certificates, cert) } - return client.New(ctx, d.InitConfig.EndpointAddr, opts...) + return cfg, nil } func (d *Driver) Features(ctx context.Context) map[driver.Feature]bool { diff --git a/tests/dialstdio.go b/tests/dialstdio.go new file mode 100644 index 00000000000..afb441cb749 --- /dev/null +++ b/tests/dialstdio.go @@ -0,0 +1,124 @@ +package tests + +import ( + "bytes" + "context" + "net" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/docker/buildx/util/progress" + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/client/llb" + gwclient "github.com/moby/buildkit/frontend/gateway/client" + "github.com/moby/buildkit/util/progress/progressui" + "github.com/moby/buildkit/util/testutil/integration" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var dialstdioTests = []func(t *testing.T, sb integration.Sandbox){ + testDialStdio, +} + +func testDialStdio(t *testing.T, sb integration.Sandbox) { + do := func(t *testing.T, pipe func(t *testing.T, cmd *exec.Cmd) net.Conn) { + errBuf := bytes.NewBuffer(nil) + defer func() { + if t.Failed() { + t.Log(errBuf.String()) + } + }() + var cmd *exec.Cmd + c, err := client.New(sb.Context(), "", client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + cmd = buildxCmd(sb, withArgs("dial-stdio", "--progress", "auto")) + conn := pipe(t, cmd) + cmd.Stderr = errBuf + if err := cmd.Start(); err != nil { + return nil, errors.Wrap(err, errBuf.String()) + } + + return conn, nil + })) + require.NoError(t, err) + + defer func() { + c.Close() + // Since the client is closed (and as such the connection shutdown), the buildx command should exit cleanly. + chErr := make(chan error, 1) + go func() { + chErr <- cmd.Wait() + }() + select { + case <-time.After(10 * time.Second): + t.Error("timeout waiting for buildx command to exit") + case <-chErr: + assert.NoError(t, err) + } + }() + + _, err = c.Info(sb.Context()) + require.NoError(t, err) + + require.Contains(t, errBuf.String(), "builder: "+sb.Address()) + + dir := t.TempDir() + + f, err := os.CreateTemp(dir, "log") + require.NoError(t, err) + defer f.Close() + + defer func() { + if t.Failed() { + dt, _ := os.ReadFile(f.Name()) + t.Log(string(dt)) + } + }() + + p, err := progress.NewPrinter(sb.Context(), f, progressui.AutoMode) + require.NoError(t, err) + + ch, chDone := progress.NewChannel(p) + done := func() { + select { + case <-sb.Context().Done(): + case <-chDone: + } + } + + _, err = c.Build(sb.Context(), client.SolveOpt{ + Exports: []client.ExportEntry{ + {Type: "local", OutputDir: dir}, + }, + }, "", func(ctx context.Context, gwc gwclient.Client) (*gwclient.Result, error) { + def, err := llb.Scratch().File(llb.Mkfile("hello", 0o600, []byte("world"))).Marshal(ctx) + if err != nil { + return nil, err + } + + return gwc.Solve(ctx, gwclient.SolveRequest{ + Definition: def.ToPB(), + }) + }, ch) + done() + require.NoError(t, err) + + dt, err := os.ReadFile(filepath.Join(dir, "hello")) + require.NoError(t, err) + require.Equal(t, "world", string(dt)) + } + + t.Run("conn=netpipe", func(t *testing.T) { + t.Parallel() + do(t, func(t *testing.T, cmd *exec.Cmd) net.Conn { + c1, c2 := net.Pipe() + cmd.Stdin = c1 + cmd.Stdout = c1 + return c2 + }) + }) +} diff --git a/tests/integration_test.go b/tests/integration_test.go index e38e46cab5d..49c5d06dd05 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -29,6 +29,7 @@ func TestIntegration(t *testing.T) { tests = append(tests, versionTests...) tests = append(tests, createTests...) tests = append(tests, rmTests...) + tests = append(tests, dialstdioTests...) testIntegration(t, tests...) }