Skip to content

Commit

Permalink
resolve service reference into container based on observed state
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas De Loof <[email protected]>
  • Loading branch information
ndeloof committed Sep 14, 2023
1 parent c0b8d34 commit 273c702
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 61 deletions.
141 changes: 81 additions & 60 deletions pkg/compose/convergence.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,71 +102,13 @@ func (c *convergence) apply(ctx context.Context, project *types.Project, options
if utils.StringContains(options.Services, name) {
strategy = options.Recreate
}
err = c.ensureService(ctx, project, service, strategy, options.Inherit, options.Timeout)
if err != nil {
return err
}

c.updateProject(project, name)
return nil
return c.ensureService(ctx, project, service, strategy, options.Inherit, options.Timeout)
})(ctx)
})
}

var mu sync.Mutex

// updateProject updates project after service converged, so dependent services relying on `service:xx` can refer to actual containers.
func (c *convergence) updateProject(project *types.Project, serviceName string) {
// operation is protected by a Mutex so that we can safely update project.Services while running concurrent convergence on services
mu.Lock()
defer mu.Unlock()

cnts := c.getObservedState(serviceName)
for i, s := range project.Services {
updateServices(&s, cnts)
project.Services[i] = s
}
}

func updateServices(service *types.ServiceConfig, cnts Containers) {
if len(cnts) == 0 {
return
}

for _, str := range []*string{&service.NetworkMode, &service.Ipc, &service.Pid} {
if d := getDependentServiceFromMode(*str); d != "" {
if serviceContainers := cnts.filter(isService(d)); len(serviceContainers) > 0 {
*str = types.NetworkModeContainerPrefix + serviceContainers[0].ID
}
}
}
var links []string
for _, serviceLink := range service.Links {
parts := strings.Split(serviceLink, ":")
serviceName := serviceLink
serviceAlias := ""
if len(parts) == 2 {
serviceName = parts[0]
serviceAlias = parts[1]
}
if serviceName != service.Name {
links = append(links, serviceLink)
continue
}
for _, container := range cnts {
name := getCanonicalContainerName(container)
if serviceAlias != "" {
links = append(links,
fmt.Sprintf("%s:%s", name, serviceAlias))
}
links = append(links,
fmt.Sprintf("%s:%s", name, name),
fmt.Sprintf("%s:%s", name, getContainerNameWithoutProject(container)))
}
service.Links = links
}
}

func (c *convergence) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig, recreate string, inherit bool, timeout *time.Duration) error {
expected, err := getScale(service)
if err != nil {
Expand All @@ -178,7 +120,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,

eg, _ := errgroup.WithContext(ctx)

err = c.resolveVolumeFrom(&service)
err = c.resolveServiceReferences(&service)
if err != nil {
return err
}
Expand Down Expand Up @@ -263,6 +205,25 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
return err
}

// resolveServiceReferences replaces reference to another service with reference to an actual container
func (c *convergence) resolveServiceReferences(service *types.ServiceConfig) error {
err := c.resolveVolumeFrom(service)
if err != nil {
return err
}

err = c.resolveLinks(service)
if err != nil {
return err
}

err = c.resolveSharedNamespaces(service)
if err != nil {
return err
}
return nil
}

func (c *convergence) resolveVolumeFrom(service *types.ServiceConfig) error {
for i, vol := range service.VolumesFrom {
spec := strings.Split(vol, ":")
Expand All @@ -283,6 +244,66 @@ func (c *convergence) resolveVolumeFrom(service *types.ServiceConfig) error {
return nil
}

func (c *convergence) resolveLinks(service *types.ServiceConfig) error {
var links []string
for _, serviceLink := range service.Links {
parts := strings.Split(serviceLink, ":")
serviceName := serviceLink
serviceAlias := ""
if len(parts) == 2 {
serviceName = parts[0]
serviceAlias = parts[1]
}
dependencies := c.getObservedState(serviceName)
if len(dependencies) == 0 {
return fmt.Errorf("cannot link to service %s: container missing", serviceName)
}
for _, container := range dependencies {
name := getCanonicalContainerName(container)
if serviceAlias != "" {
links = append(links,
fmt.Sprintf("%s:%s", name, serviceAlias))
}
links = append(links,
fmt.Sprintf("%s:%s", name, name),
fmt.Sprintf("%s:%s", name, getContainerNameWithoutProject(container)))
}
}
service.Links = links
return nil
}

func (c *convergence) resolveSharedNamespaces(service *types.ServiceConfig) error {
str := service.NetworkMode
if name := getDependentServiceFromMode(str); name != "" {
dependencies := c.getObservedState(name)
if len(dependencies) == 0 {
return fmt.Errorf("cannot share network namespace with service %s: container missing", name)
}
service.NetworkMode = types.ContainerPrefix + dependencies.sorted()[0].ID
}

str = service.Ipc
if name := getDependentServiceFromMode(str); name != "" {
dependencies := c.getObservedState(name)
if len(dependencies) == 0 {
return fmt.Errorf("cannot share IPC namespace with service %s: container missing", name)
}
service.Ipc = types.ContainerPrefix + dependencies.sorted()[0].ID
}

str = service.Pid
if name := getDependentServiceFromMode(str); name != "" {
dependencies := c.getObservedState(name)
if len(dependencies) == 0 {
return fmt.Errorf("cannot share PID namespace with service %s: container missing", name)
}
service.Pid = types.ContainerPrefix + dependencies.sorted()[0].ID
}

return nil
}

func mustRecreate(expected types.ServiceConfig, actual moby.Container, policy string) (bool, error) {
if policy == api.RecreateNever {
return false, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/compose/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (s *composeService) prepareRun(ctx context.Context, project *types.Project,
if err != nil {
return "", err
}
updateServices(&service, observedState)

if !opts.NoDeps {
if err := s.waitDependencies(ctx, project, service.DependsOn, observedState); err != nil {
Expand All @@ -104,6 +103,11 @@ func (s *composeService) prepareRun(ctx context.Context, project *types.Project,
Labels: mergeLabels(service.Labels, service.CustomLabels),
}

err = newConvergence(project.ServiceNames(), observedState, s).resolveServiceReferences(&service)
if err != nil {
return "", err
}

created, err := s.createContainer(ctx, project, service, service.ContainerName, 1, createOpts)
if err != nil {
return "", err
Expand Down
8 changes: 8 additions & 0 deletions pkg/e2e/fixtures/no-deps/links.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
services:
app:
image: nginx:alpine
links:
- db

db:
image: nginx:alpine
7 changes: 7 additions & 0 deletions pkg/e2e/fixtures/no-deps/network-mode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
services:
app:
image: nginx:alpine
network_mode: service:db

db:
image: nginx:alpine
34 changes: 34 additions & 0 deletions pkg/e2e/noDeps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,37 @@ func TestNoDepsVolumeFrom(t *testing.T) {
res := c.RunDockerComposeCmdNoCheck(t, "-f", "fixtures/no-deps/volume-from.yaml", "--project-name", projectName, "up", "--no-deps", "-d", "app")
res.Assert(t, icmd.Expected{ExitCode: 1, Err: "cannot share volume with service db: container missing"})
}

func TestNoDepsLinks(t *testing.T) {
c := NewParallelCLI(t)
const projectName = "e2e-no-deps-links"
t.Cleanup(func() {
c.RunDockerComposeCmd(t, "--project-name", projectName, "down")
})

c.RunDockerComposeCmd(t, "-f", "fixtures/no-deps/links.yaml", "--project-name", projectName, "up", "-d")

c.RunDockerComposeCmd(t, "-f", "fixtures/no-deps/links.yaml", "--project-name", projectName, "up", "--no-deps", "-d", "app")

c.RunDockerCmd(t, "rm", "-f", fmt.Sprintf("%s-db-1", projectName))

res := c.RunDockerComposeCmdNoCheck(t, "-f", "fixtures/no-deps/links.yaml", "--project-name", projectName, "up", "--no-deps", "-d", "app")
res.Assert(t, icmd.Expected{ExitCode: 1, Err: "cannot link to service db: container missing"})
}

func TestNoDepsNetworkMode(t *testing.T) {
c := NewParallelCLI(t)
const projectName = "e2e-no-deps-network-mode"
t.Cleanup(func() {
c.RunDockerComposeCmd(t, "--project-name", projectName, "down")
})

c.RunDockerComposeCmd(t, "-f", "fixtures/no-deps/network-mode.yaml", "--project-name", projectName, "up", "-d")

c.RunDockerComposeCmd(t, "-f", "fixtures/no-deps/network-mode.yaml", "--project-name", projectName, "up", "--no-deps", "-d", "app")

c.RunDockerCmd(t, "rm", "-f", fmt.Sprintf("%s-db-1", projectName))

res := c.RunDockerComposeCmdNoCheck(t, "-f", "fixtures/no-deps/network-mode.yaml", "--project-name", projectName, "up", "--no-deps", "-d", "app")
res.Assert(t, icmd.Expected{ExitCode: 1, Err: "cannot share network namespace with service db: container missing"})
}

0 comments on commit 273c702

Please sign in to comment.