diff --git a/README.md b/README.md index 1a6f14ca9..7afaf96b1 100644 --- a/README.md +++ b/README.md @@ -50,12 +50,27 @@ command = touch /tmp/example schedule = @hourly command = touch /tmp/example - [job-service-run "service-executed-on-new-container"] schedule = 0,20,40 * * * * image = ubuntu network = swarm_network command = touch /tmp/example + +[job-service-run "job-executed-on-existing-service"] +schedule = 0,20,40 * * * * +service = my-service +``` + +#### Docker labels configurations + +In order to use this type of configurations, ofelia need access to docker socket. + +```sh +docker run -it --rm \ + -v /var/run/docker.sock:/var/run/docker.sock:ro \ + --label ofelia.job-local.my-test-job.schedule="@every 5s" \ + --label ofelia.job-local.my-test-job.command="date" \ + mcuadros/ofelia:latest daemon --docker ``` #### Docker labels configurations diff --git a/core/runservice.go b/core/runservice.go index 21c85962f..fd7977332 100644 --- a/core/runservice.go +++ b/core/runservice.go @@ -20,30 +20,62 @@ type RunServiceJob struct { Delete bool `default:"true"` Image string Network string + Service string } func NewRunServiceJob(c *docker.Client) *RunServiceJob { return &RunServiceJob{Client: c} } +// Main method for running a service-based job +// If the service has been provided it will start a new task for the existing service +// Otherwise it will create a new service based on the image and other parameters func (j *RunServiceJob) Run(ctx *Context) error { - if err := j.pullImage(); err != nil { - return err + + if j.Image != "" { + if err := j.pullImage(); err != nil { + return err + } } - svc, err := j.buildService() + var svcID string + if j.Service == "" { + svc, err := j.buildService() - if err != nil { - return err - } + if err != nil { + return err + } + + svcID = svc.ID + ctx.Logger.Noticef("Created service %s for job %s\n", svcID, j.Name) + } else { + svc, err := j.inspectService(ctx, j.Service) + if err != nil { + return err + } + svcID = svc.ID + ctx.Logger.Noticef("Found service %s for job %s\n", svcID, j.Name) - ctx.Logger.Noticef("Created service %s for job %s\n", svc.ID, j.Name) + _, err = j.scaleService(ctx, svcID, false) + if err != nil { + return err + } + + _, err = j.scaleService(ctx, svcID, true) + if err != nil { + return err + } + } - if err := j.watchContainer(ctx, svc.ID); err != nil { + if err := j.watchContainer(ctx, svcID); err != nil { return err } - return j.deleteService(ctx, svc.ID) + if j.Service == "" { + return j.deleteService(ctx, svcID) + } else { + return nil + } } func (j *RunServiceJob) pullImage() error { @@ -96,6 +128,48 @@ func (j *RunServiceJob) buildService() (*swarm.Service, error) { return svc, err } +// Scale an existing service one replica up or down +func (j *RunServiceJob) scaleService(ctx *Context, svcID string, up bool) (*swarm.Service, error) { + svc, err := j.inspectService(ctx, j.Service) + if err != nil { + return nil, err + } + + replicas := *svc.Spec.Mode.Replicated.Replicas + if up { + replicas += 1 + } else { + // If there already 0 replicas of a service, there is no need to scale down + if replicas == 0 { + return svc, err + } + replicas -= 1 + } + + updateSvcOpts := docker.UpdateServiceOptions{} + + updateSvcOpts.Name = svc.Spec.Name + updateSvcOpts.Version = svc.Version.Index + + // The old spec is required, otherwise defaults will override the service + updateSvcOpts.ServiceSpec = svc.Spec + + updateSvcOpts.Mode.Replicated = + &swarm.ReplicatedService{ + Replicas: &replicas, + } + + // Do the actual scaling + err = j.Client.UpdateService(svcID, updateSvcOpts) + if err != nil { + return nil, err + } + + // Give docker the time to do the scaling + time.Sleep(time.Millisecond * 1000) + return svc, err +} + const ( // TODO are these const defined somewhere in the docker API? @@ -110,9 +184,9 @@ func (j *RunServiceJob) watchContainer(ctx *Context, svcID string) error { ctx.Logger.Noticef("Checking for service ID %s (%s) termination\n", svcID, j.Name) - svc, err := j.Client.InspectService(svcID) + svc, err := j.inspectService(ctx, svcID) if err != nil { - return fmt.Errorf("Failed to inspect service %s: %s", svcID, err.Error()) + return err } // On every tick, check if all the services have completed, or have error out @@ -123,12 +197,14 @@ func (j *RunServiceJob) watchContainer(ctx *Context, svcID string) error { defer wg.Done() for _ = range svcChecker.C { + // TODO will not work with longer existing services + // TODO doesn't work if svc.CreatedAt.After(time.Now().Add(maxProcessDuration)) { err = ErrMaxTimeRunning return } - taskExitCode, found := j.findtaskstatus(ctx, svc.ID) + taskExitCode, found := j.findTaskStatus(ctx, svc.ID) if found { exitCode = taskExitCode @@ -140,19 +216,28 @@ func (j *RunServiceJob) watchContainer(ctx *Context, svcID string) error { wg.Wait() ctx.Logger.Noticef("Service ID %s (%s) has completed with exit code %d\n", svcID, j.Name, exitCode) + + switch exitCode { + case 0: + return nil + case -1: + return ErrUnexpected + default: + return fmt.Errorf("error non-zero exit code: %d", exitCode) + } return err } -func (j *RunServiceJob) findtaskstatus(ctx *Context, taskID string) (int, bool) { +func (j *RunServiceJob) findTaskStatus(ctx *Context, svcID string) (int, bool) { taskFilters := make(map[string][]string) - taskFilters["service"] = []string{taskID} + taskFilters["service"] = []string{svcID} tasks, err := j.Client.ListTasks(docker.ListTasksOptions{ Filters: taskFilters, }) if err != nil { - ctx.Logger.Errorf("Failed to find task ID %s. Considering the task terminated: %s\n", taskID, err.Error()) + ctx.Logger.Errorf("Failed to find tasks fo service %s. Considering the task terminated: %s\n", svcID, err.Error()) return 0, false } @@ -186,6 +271,20 @@ func (j *RunServiceJob) findtaskstatus(ctx *Context, taskID string) (int, bool) if exitCode == 0 && task.Status.State == swarm.TaskStateRejected { exitCode = 255 // force non-zero exit for task rejected } + + err = j.Client.GetServiceLogs(docker.LogsServiceOptions{ + Service: svcID, + Stderr: true, + Stdout: true, + Follow: false, + ErrorStream: ctx.Execution.ErrorStream, + OutputStream: ctx.Execution.OutputStream, + }) + if err != nil { + ctx.Logger.Errorf("Error getting logs for service: %s - %s \n", svcID, err.Error()) + return 0, false + } + done = true break } @@ -211,3 +310,19 @@ func (j *RunServiceJob) deleteService(ctx *Context, svcID string) error { return err } + +// Convenience method for inspecting a service +func (j *RunServiceJob) inspectService(ctx *Context, svcID string) (*swarm.Service, error) { + var err error + var svc *swarm.Service + if j.Service == "" { + svc, err = j.Client.InspectService(svcID) + } else { + svc, err = j.Client.InspectService(j.Service) + } + + if err != nil { + return nil, fmt.Errorf("Failed to inspect service %s: %s", j.Service, err.Error()) + } + return svc, err +}