Skip to content

Commit

Permalink
Add dial-stdio command
Browse files Browse the repository at this point in the history
This allows the buildx CLI to act a proxy to the configured instance.
It allows external code to use buildx itself as a driver for connecting
to buildkitd instances.

Instance and node selection should follow the same semantics as as
`buildx build`, including taking into account the `BUILDX_BUILDER` env
var and the `--builder` global flag.

Signed-off-by: Brian Goff <[email protected]>
  • Loading branch information
cpuguy83 committed Feb 4, 2024
1 parent d0c4bed commit b1ae048
Show file tree
Hide file tree
Showing 12 changed files with 414 additions and 11 deletions.
62 changes: 62 additions & 0 deletions build/dial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package build

import (
"context"
stderrors "errors"
"net"

"github.com/containerd/containerd/platforms"
"github.com/docker/buildx/builder"
"github.com/docker/buildx/util/progress"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

func Dial(ctx context.Context, nodes []builder.Node, pw progress.Writer, platform *v1.Platform) (net.Conn, error) {
nodes, err := filterAvailableNodes(nodes)
if err != nil {
return nil, err
}

if len(nodes) == 0 {
return nil, errors.New("no nodes available")
}

var pls []v1.Platform
if platform != nil {
pls = []v1.Platform{*platform}
}

opts := map[string]Options{"default": {Platforms: pls}}
resolved, err := resolveDrivers(ctx, nodes, opts, pw)
if err != nil {
return nil, err
}

var dialError error
for _, ls := range resolved {
for _, rn := range ls {
if platform != nil {
p := *platform
var found bool
for _, pp := range rn.platforms {
if platforms.Only(p).Match(pp) {
found = true
break
}
}
if !found {
continue
}
}

conn, err := nodes[rn.driverIndex].Driver.Dial(ctx)
if err == nil {
return conn, nil
}
dialError = stderrors.Join(err)
}
}

return nil, errors.Wrap(dialError, "no nodes available")
}
132 changes: 132 additions & 0 deletions commands/dial_stdio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package commands

import (
"io"
"net"
"os"

"github.com/containerd/containerd/platforms"
"github.com/docker/buildx/build"
"github.com/docker/buildx/builder"
"github.com/docker/buildx/util/progress"
"github.com/docker/cli/cli/command"
"github.com/moby/buildkit/util/appcontext"
"github.com/moby/buildkit/util/progress/progressui"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

type stdioOptions struct {
builder string
platform string
progress string
}

func runDialStdio(dockerCli command.Cli, opts stdioOptions) error {
ctx := appcontext.Context()

contextPathHash, _ := os.Getwd()
b, err := builder.New(dockerCli,
builder.WithName(opts.builder),
builder.WithContextPathHash(contextPathHash),
)
if err != nil {
return err
}

if err = updateLastActivity(dockerCli, b.NodeGroup); err != nil {
return errors.Wrapf(err, "failed to update builder last activity time")
}
nodes, err := b.LoadNodes(ctx)
if err != nil {
return err
}

printer, err := progress.NewPrinter(ctx, os.Stderr, progressui.DisplayMode(opts.progress), progress.WithPhase("dial-stdio"), progress.WithDesc("builder: "+b.Name, "builder:"+b.Name))
if err != nil {
return err
}

var p *v1.Platform
if opts.platform != "" {
pp, err := platforms.Parse(opts.platform)
if err != nil {
return errors.Wrapf(err, "invalid platform %q", opts.platform)
}
p = &pp
}

defer printer.Wait()

return progress.Wrap("Proxying to builder", printer.Write, func(sub progress.SubLogger) error {
var conn net.Conn

err := sub.Wrap("Dialing builder", func() error {
conn, err = build.Dial(ctx, nodes, printer, p)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}

defer conn.Close()

go func() {
<-ctx.Done()
closeWrite(conn)
}()

var eg errgroup.Group

eg.Go(func() error {
_, err := io.Copy(conn, os.Stdin)
closeWrite(conn)
return err
})
eg.Go(func() error {
_, err := io.Copy(os.Stdout, conn)
closeRead(conn)
return err
})
return eg.Wait()
})
}

func closeRead(conn net.Conn) error {
if c, ok := conn.(interface{ CloseRead() error }); ok {
return c.CloseRead()
}
return conn.Close()
}

func closeWrite(conn net.Conn) error {
if c, ok := conn.(interface{ CloseWrite() error }); ok {
return c.CloseWrite()
}
return conn.Close()
}

func dialStdioCmd(dockerCli command.Cli, rootOpts *rootOptions) *cobra.Command {
opts := stdioOptions{}

cmd := &cobra.Command{
Use: "dial-stdio",
Short: "Dial stdio",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
opts.builder = rootOpts.builder
return runDialStdio(dockerCli, opts)
},
}

flags := cmd.Flags()
cmd.Flags()
flags.StringVar(&opts.platform, "platform", os.Getenv("DOCKER_DEFAULT_PLATFORM"), "Target platform: this is used for node selection")
flags.StringVar(&opts.progress, "progress", "quiet", "Set type of progress output (auto, plain, tty).")
return cmd
}
1 change: 1 addition & 0 deletions commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func addCommands(cmd *cobra.Command, dockerCli command.Cli) {
buildCmd(dockerCli, opts, nil),
bakeCmd(dockerCli, opts),
createCmd(dockerCli),
dialStdioCmd(dockerCli, opts),
rmCmd(dockerCli, opts),
lsCmd(dockerCli),
useCmd(dockerCli, opts),
Expand Down
1 change: 1 addition & 0 deletions docs/reference/buildx.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Extended build capabilities with BuildKit
| [`build`](buildx_build.md) | Start a build |
| [`create`](buildx_create.md) | Create a new builder instance |
| [`debug`](buildx_debug.md) | Start debugger (EXPERIMENTAL) |
| [`dial-stdio`](buildx_dial-stdio.md) | Dial stdio |
| [`du`](buildx_du.md) | Disk usage |
| [`imagetools`](buildx_imagetools.md) | Commands to work on images in registry |
| [`inspect`](buildx_inspect.md) | Inspect current builder instance |
Expand Down
16 changes: 16 additions & 0 deletions docs/reference/buildx_dial-stdio.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# docker buildx dial-stdio

<!---MARKER_GEN_START-->
Dial stdio

### Options

| Name | Type | Default | Description |
|:-------------|:---------|:--------|:-------------------------------------------------|
| `--builder` | `string` | | Override the configured builder instance |
| `--platform` | `string` | | Target platform: this is used for node selection |
| `--progress` | `string` | `quiet` | Set type of progress output (auto, plain, tty). |


<!---MARKER_GEN_END-->

11 changes: 9 additions & 2 deletions driver/docker-container/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,20 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
_, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"})
if err != nil {
return nil, err
}

conn = demuxConn(conn)
return conn, nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
conn, err := d.Dial(ctx)
if err != nil {
return nil, err
}

exp, _, err := detect.Exporter()
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion driver/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,14 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta)
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
opts := []client.ClientOpt{
client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta)
return d.Dial(ctx)
}), client.WithSessionDialer(func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) {
return d.DockerAPI.DialHijack(ctx, "/session", proto, meta)
}),
Expand Down
1 change: 1 addition & 0 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Driver interface {
Version(context.Context) (string, error)
Stop(ctx context.Context, force bool) error
Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error
Dial(ctx context.Context) (net.Conn, error)
Client(ctx context.Context) (*client.Client, error)
Features(ctx context.Context) map[Feature]bool
HostGatewayIP(ctx context.Context) (net.IP, error)
Expand Down
7 changes: 5 additions & 2 deletions driver/kubernetes/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
restClient := d.clientset.CoreV1().RESTClient()
restClientConfig, err := d.KubeClientConfig.ClientConfig()
if err != nil {
Expand All @@ -208,15 +208,18 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
if err != nil {
return nil, err
}
return conn, nil
}

func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
exp, _, err := detect.Exporter()
if err != nil {
return nil, err
}

var opts []client.ClientOpt
opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return conn, nil
return d.Dial(ctx)
}))
if td, ok := exp.(client.TracerDelegate); ok {
opts = append(opts, client.WithTracerDelegate(td))
Expand Down
63 changes: 57 additions & 6 deletions driver/remote/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package remote

import (
"context"
"errors"
"crypto/tls"
"crypto/x509"
"net"
"os"
"strings"
"time"

"github.com/docker/buildx/driver"
"github.com/docker/buildx/util/progress"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/pkg/errors"
)

type Driver struct {
Expand Down Expand Up @@ -82,14 +86,61 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
opts = append(opts, client.WithTracerDelegate(td))
}

opts = append(opts, client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
return d.Dial(ctx)
}))

return client.New(ctx, "", opts...)
}

func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
network, addr, ok := strings.Cut(d.InitConfig.EndpointAddr, "://")
if !ok {
return nil, errors.Errorf("invalid endpoint address: %s", d.InitConfig.EndpointAddr)
}

dialer := &net.Dialer{}

conn, err := dialer.DialContext(ctx, network, addr)
if err != nil {
return nil, errors.WithStack(err)
}

if d.tlsOpts != nil {
opts = append(opts, []client.ClientOpt{
client.WithServerConfig(d.tlsOpts.serverName, d.tlsOpts.caCert),
client.WithCredentials(d.tlsOpts.cert, d.tlsOpts.key),
}...)
cfg, err := loadTLS(d.tlsOpts)
if err != nil {
return nil, errors.Wrap(err, "error loading tls config")
}
conn = tls.Client(conn, cfg)
}
return conn, nil
}

func loadTLS(opts *tlsOpts) (*tls.Config, error) {
cfg := &tls.Config{
ServerName: opts.serverName,
RootCAs: x509.NewCertPool(),
}

if opts.caCert != "" {
ca, err := os.ReadFile(opts.caCert)
if err != nil {
return nil, errors.Wrap(err, "could not read ca certificate")
}
if ok := cfg.RootCAs.AppendCertsFromPEM(ca); !ok {
return nil, errors.New("failed to append ca certs")
}
}

if opts.cert != "" || opts.key != "" {
cert, err := tls.LoadX509KeyPair(opts.cert, opts.key)
if err != nil {
return nil, errors.Wrap(err, "could not read certificate/key")
}
cfg.Certificates = append(cfg.Certificates, cert)
}

return client.New(ctx, d.InitConfig.EndpointAddr, opts...)
return cfg, nil
}

func (d *Driver) Features(ctx context.Context) map[driver.Feature]bool {
Expand Down
Loading

0 comments on commit b1ae048

Please sign in to comment.