Skip to content

Commit

Permalink
docker_client: don't pull image and create container per tc command
Browse files Browse the repository at this point in the history
  • Loading branch information
w-miller committed Aug 14, 2023
1 parent e1cb30a commit 8a6cac5
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 123 deletions.
26 changes: 20 additions & 6 deletions pkg/container/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,21 +570,22 @@ func TestNetemContainerDportFilter_Success(t *testing.T) {
engineClient.AssertExpectations(t)
}

func Test_tcContainerCommand(t *testing.T) {
func Test_tcContainerCommands(t *testing.T) {
c := &Container{
ContainerInfo: DetailsResponse(AsMap("ID", "targetID")),
}

config := container.Config{
Labels: map[string]string{"com.gaiaadm.pumba.skip": "true"},
Entrypoint: []string{"tc"},
Cmd: []string{"test", "me"},
Image: "pumba/tcimage",
// Used as long-running entry-point to keep container alive between commands
Cmd: []string{"monitor"},
Image: "pumba/tcimage",
}
// host config
hconfig := container.HostConfig{
// auto remove container on tc command exit
AutoRemove: true,
// Don't auto-remove, since we may want to run multiple commands
AutoRemove: false,
// NET_ADMIN is required for "tc netem"
CapAdd: []string{"NET_ADMIN"},
// use target container network stack
Expand Down Expand Up @@ -621,8 +622,21 @@ func Test_tcContainerCommand(t *testing.T) {
// start container
engineClient.On("ContainerStart", ctx, "tcID", types.ContainerStartOptions{}).Return(nil)

// create exec for first command
engineClient.On("ContainerExecCreate", ctx, "tcID", types.ExecConfig{Cmd: []string{"tc", "test", "one"}}).Return(types.IDResponse{ID: "execID1"}, nil)
// start exec for first command
engineClient.On("ContainerExecStart", ctx, "execID1", types.ExecStartCheck{}).Return(nil)

// create exec for second command
engineClient.On("ContainerExecCreate", ctx, "tcID", types.ExecConfig{Cmd: []string{"tc", "test", "two"}}).Return(types.IDResponse{ID: "execID2"}, nil)
// start exec for second command
engineClient.On("ContainerExecStart", ctx, "execID2", types.ExecStartCheck{}).Return(nil)

// remove container
engineClient.On("ContainerRemove", ctx, "tcID", types.ContainerRemoveOptions{Force: true}).Return(nil)

client := dockerClient{containerAPI: engineClient, imageAPI: engineClient}
err := client.tcContainerCommand(context.TODO(), c, []string{"test", "me"}, "pumba/tcimage", true)
err := client.tcContainerCommands(context.TODO(), c, [][]string{{"test", "one"}, {"test", "two"}}, "pumba/tcimage", true)

assert.NoError(t, err)
engineClient.AssertExpectations(t)
Expand Down
224 changes: 107 additions & 117 deletions pkg/container/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (client dockerClient) startNetemContainer(ctx context.Context, c *Container
// stop disruption command
// netemStopCommand := "tc qdisc del dev eth0 root netem"
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("adding netem qdisc")
return client.tcCommand(ctx, c, netemCommand, tcimage, pull)
return client.tcCommands(ctx, c, [][]string{netemCommand}, tcimage, pull)
}
return nil
}
Expand All @@ -388,54 +388,37 @@ func (client dockerClient) stopNetemContainer(ctx context.Context, c *Container,
"dryrun": dryrun,
}).Debug("stop netem for container")
if !dryrun {
var netemCommands [][]string
if len(ips) != 0 || len(sports) != 0 || len(dports) != 0 {
// delete qdisc 'parent 1:1 handle 10:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
netemCommand := []string{"qdisc", "del", "dev", netInterface, "parent", "1:1", "handle", "10:"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("deleting netem qdisc")
err := client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to delete qdisc 'parent 1:1 handle 10:'")
}
// delete qdisc 'parent 1:2 handle 20:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
netemCommand = []string{"qdisc", "del", "dev", netInterface, "parent", "1:2", "handle", "20:"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("deleting netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to delete qdisc 'parent 1:2 handle 20:'")
}
// delete qdisc 'parent 1:3 handle 30:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
netemCommand = []string{"qdisc", "del", "dev", netInterface, "parent", "1:3", "handle", "30:"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("deleting netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to delete qdisc 'parent 1:3 handle 30:'")
}
// delete qdisc 'root handle 1: prio'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
netemCommand = []string{"qdisc", "del", "dev", netInterface, "root", "handle", "1:", "prio"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("deleting netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to delete qdisc 'root handle 1: prio'")
netemCommands = [][]string{
// delete qdisc 'parent 1:1 handle 10:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
{"qdisc", "del", "dev", netInterface, "parent", "1:1", "handle", "10:"},
// delete qdisc 'parent 1:2 handle 20:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
{"qdisc", "del", "dev", netInterface, "parent", "1:2", "handle", "20:"},
// delete qdisc 'parent 1:3 handle 30:'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
{"qdisc", "del", "dev", netInterface, "parent", "1:3", "handle", "30:"},
// delete qdisc 'root handle 1: prio'
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
{"qdisc", "del", "dev", netInterface, "root", "handle", "1:", "prio"},
}
} else {
// stop netem command
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
netemCommand := []string{"qdisc", "del", "dev", netInterface, "root", "netem"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("deleting netem qdisc")
err := client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to stop netem")
netemCommands = [][]string{
// stop netem command
// http://www.linuxfoundation.org/collaborate/workgroups/networking/netem
{"qdisc", "del", "dev", netInterface, "root", "netem"},
}
}
err := client.tcCommands(ctx, c, netemCommands, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to run netem tc commands")
}
}
return nil
}

//nolint:funlen
func (client dockerClient) startNetemContainerIPFilter(ctx context.Context, c *Container, netInterface string, netemCmd []string,
ips []*net.IPNet, sports []string, dports []string, tcimage string, pull bool, dryrun bool) error {
log.WithFields(log.Fields{
Expand Down Expand Up @@ -465,113 +448,98 @@ func (client dockerClient) startNetemContainerIPFilter(ctx context.Context, c *C
// sfq sfq netem
// band 0 1 2

// Create a priority-based queue. This *instantly* creates classes 1:1, 1:2, 1:3
// 'tc qdisc add dev <netInterface> root handle 1: prio'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
handleCommand := []string{"qdisc", "add", "dev", netInterface, "root", "handle", "1:", "prio"}
log.WithField("netem", strings.Join(handleCommand, " ")).Debug("adding netem qdisc")
err := client.tcCommand(ctx, c, handleCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to create a priority-based queue")
}

// Create Stochastic Fairness Queueing (sfq) queueing discipline for 1:1 class.
// 'tc qdisc add dev <netInterface> parent 1:1 handle 10: sfq'
// See more: https://linux.die.net/man/8/tc-sfq
netemCommand := []string{"qdisc", "add", "dev", netInterface, "parent", "1:1", "handle", "10:", "sfq"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("adding netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to create Stochastic Fairness Queueing (sfq) queueing discipline for 1:1 class")
}

// Create Stochastic Fairness Queueing (sfq) queueing discipline for 1:2 class
// 'tc qdisc add dev <netInterface> parent 1:2 handle 20: sfq'
// See more: https://linux.die.net/man/8/tc-sfq
netemCommand = []string{"qdisc", "add", "dev", netInterface, "parent", "1:2", "handle", "20:", "sfq"}
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("adding netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to create Stochastic Fairness Queueing (sfq) queueing discipline for 1:2 class")
}

// Add queueing discipline for 1:3 class. No traffic is going through 1:3 yet
// 'tc qdisc add dev <netInterface> parent 1:3 handle 30: netem <netemCmd>'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
netemCommand = append([]string{"qdisc", "add", "dev", netInterface, "parent", "1:3", "handle", "30:", "netem"}, netemCmd...)
log.WithField("netem", strings.Join(netemCommand, " ")).Debug("adding netem qdisc")
err = client.tcCommand(ctx, c, netemCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to add queueing discipline for 1:3 class")
commands := [][]string{
// Create a priority-based queue. This *instantly* creates classes 1:1, 1:2, 1:3
// 'tc qdisc add dev <netInterface> root handle 1: prio'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
{"qdisc", "add", "dev", netInterface, "root", "handle", "1:", "prio"},
// Create Stochastic Fairness Queueing (sfq) queueing discipline for 1:1 class.
// 'tc qdisc add dev <netInterface> parent 1:1 handle 10: sfq'
// See more: https://linux.die.net/man/8/tc-sfq
{"qdisc", "add", "dev", netInterface, "parent", "1:1", "handle", "10:", "sfq"},
// Create Stochastic Fairness Queueing (sfq) queueing discipline for 1:2 class
// 'tc qdisc add dev <netInterface> parent 1:2 handle 20: sfq'
// See more: https://linux.die.net/man/8/tc-sfq
{"qdisc", "add", "dev", netInterface, "parent", "1:2", "handle", "20:", "sfq"},
// Add queueing discipline for 1:3 class. No traffic is going through 1:3 yet
// 'tc qdisc add dev <netInterface> parent 1:3 handle 30: netem <netemCmd>'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
append([]string{"qdisc", "add", "dev", netInterface, "parent", "1:3", "handle", "30:", "netem"}, netemCmd...),
}

// # redirect traffic to specific IP through band 3
// 'tc filter add dev <netInterface> protocol ip parent 1:0 prio 1 u32 match ip dst <targetIP> flowid 1:3'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
for _, ip := range ips {
filterCommand := []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "dst", ip.String(), "flowid", "1:3"}
log.WithField("netem", strings.Join(filterCommand, " ")).Debug("adding netem IP filter")
err = client.tcCommand(ctx, c, filterCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to redirect traffic to specific IP through band 3")
}
commands = append(commands, []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "dst", ip.String(), "flowid", "1:3"})
}

// # redirect traffic to specific sport through band 3
// 'tc filter add dev <netInterface> protocol ip parent 1:0 prio 1 u32 match ip <s/d>port <targetPort> 0xffff flowid 1:3'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
for _, sport := range sports {
filterPortCommand := []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "sport", sport, "0xffff", "flowid", "1:3"}
log.WithField("netem", strings.Join(filterPortCommand, " ")).Debug("adding netem port filter")
err = client.tcCommand(ctx, c, filterPortCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to redirect traffic from port "+sport+" through band 3")
}
commands = append(commands, []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "sport", sport, "0xffff", "flowid", "1:3"})
}

// # redirect traffic to specific dport through band 3
// 'tc filter add dev <netInterface> protocol ip parent 1:0 prio 1 u32 match ip <s/d>port <targetPort> 0xffff flowid 1:3'
// See more: http://man7.org/linux/man-pages/man8/tc-netem.8.html
for _, dport := range dports {
filterPortCommand := []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "dport", dport, "0xffff", "flowid", "1:3"}
log.WithField("netem", strings.Join(filterPortCommand, " ")).Debug("adding netem port filter")
err = client.tcCommand(ctx, c, filterPortCommand, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to redirect traffic to port "+dport+" through band 3")
}
commands = append(commands, []string{"filter", "add", "dev", netInterface, "protocol", "ip", "parent", "1:0", "prio", "1",
"u32", "match", "ip", "dport", dport, "0xffff", "flowid", "1:3"})
}

err := client.tcCommands(ctx, c, commands, tcimage, pull)
if err != nil {
return errors.Wrap(err, "failed to run tc commands")
}
}
return nil
}

func (client dockerClient) tcCommand(ctx context.Context, c *Container, args []string, tcimage string, pull bool) error {
func (client dockerClient) tcCommands(ctx context.Context, c *Container, argsList [][]string, tcimage string, pull bool) error {
if tcimage == "" {
return client.execOnContainer(ctx, c, "tc", args, true)
for _, args := range argsList {
if err := client.execOnContainer(ctx, c, "tc", args, true); err != nil {
return errors.Wrapf(err, "error running tc command on container: %v", strings.Join(args, " "))
}
}
return nil
}
return client.tcContainerCommand(ctx, c, args, tcimage, pull)
return client.tcContainerCommands(ctx, c, argsList, tcimage, pull)
}

// execute tc command using other container (with iproute2 package installed), using target container network stack
func (client dockerClient) tcExecCommand(ctx context.Context, execID string, args []string) error {
execConfig := types.ExecConfig{
Cmd: append([]string{"tc"}, args...),
}
execCreateResponse, err := client.containerAPI.ContainerExecCreate(ctx, execID, execConfig)
if err != nil {
return errors.Wrap(err, "failed to create tc-container exec")
}
if err = client.containerAPI.ContainerExecStart(ctx, execCreateResponse.ID, types.ExecStartCheck{}); err != nil {
return errors.Wrap(err, "failed to start tc-container exec")
}
log.WithField("args", strings.Join(args, " ")).Debug("run command on tc-container")
return nil
}

// execute tc commands using other container (with iproute2 package installed), using target container network stack
// try to use `gaiadocker\iproute2` image (Alpine + iproute2 package)
func (client dockerClient) tcContainerCommand(ctx context.Context, target *Container, args []string, tcimage string, pull bool) error {
func (client dockerClient) tcContainerCommands(ctx context.Context, target *Container, argsList [][]string, tcimage string, pull bool) error {
log.WithFields(log.Fields{
"container": target.ID(),
"tc-image": tcimage,
"pull": pull,
"args": args,
"args-list": argsList,
}).Debug("executing tc command in a separate container joining target container network namespace")
// container config
config := ctypes.Config{
Labels: map[string]string{"com.gaiaadm.pumba.skip": "true"},
Entrypoint: []string{"tc"},
Cmd: args,
Image: tcimage,
}

// host config
hconfig := ctypes.HostConfig{
// auto remove container on tc command exit
AutoRemove: true,
// Don't auto-remove, since we may want to run multiple commands
AutoRemove: false,
// NET_ADMIN is required for "tc netem"
CapAdd: []string{"NET_ADMIN"},
// use target container network stack
Expand All @@ -585,8 +553,8 @@ func (client dockerClient) tcContainerCommand(ctx context.Context, target *Conta
log.WithField("network", hconfig.NetworkMode).Debug("network mode")
// pull docker image if required: can pull only public images
if pull {
log.WithField("image", config.Image).Debug("pulling tc-image")
events, err := client.imageAPI.ImagePull(ctx, config.Image, types.ImagePullOptions{})
log.WithField("image", tcimage).Debug("pulling tc-image")
events, err := client.imageAPI.ImagePull(ctx, tcimage, types.ImagePullOptions{})
if err != nil {
return errors.Wrap(err, "failed to pull tc-image")
}
Expand All @@ -603,8 +571,19 @@ func (client dockerClient) tcContainerCommand(ctx context.Context, target *Conta
log.Debug(pullResponse)
}
}
log.WithField("image", config.Image).Debug("creating tc-container")

// container config
config := ctypes.Config{
Labels: map[string]string{"com.gaiaadm.pumba.skip": "true"},
Entrypoint: []string{"tc"},
// Used as long-running entry-point to keep container alive between commands
Cmd: []string{"monitor"},
Image: tcimage,
}

createResponse, err := client.containerAPI.ContainerCreate(ctx, &config, &hconfig, nil, nil, "")

log.WithField("image", config.Image).Debug("creating tc-container")
if err != nil {
return errors.Wrap(err, "failed to create tc-container from tc-image")
}
Expand All @@ -613,6 +592,17 @@ func (client dockerClient) tcContainerCommand(ctx context.Context, target *Conta
if err != nil {
return errors.Wrap(err, "failed to start tc-container")
}

for _, args := range argsList {
if err = client.tcExecCommand(ctx, createResponse.ID, args); err != nil {
return errors.Wrapf(err, "error running tc command on container: %v", strings.Join(args, " "))
}
}

if err = client.containerAPI.ContainerRemove(ctx, createResponse.ID, types.ContainerRemoveOptions{Force: true}); err != nil {
return errors.Wrap(err, "failed to remove tc-container")
}

return nil
}

Expand Down

0 comments on commit 8a6cac5

Please sign in to comment.