From e93a88e00cebf06bbdeab81cac1300a73a8e0f79 Mon Sep 17 00:00:00 2001 From: Ramiro del Corro Date: Sun, 20 Dec 2020 15:08:43 -0800 Subject: [PATCH] Applied changes from mcuadros/ofelia#137 --- Dockerfile | 4 +- README.md | 44 ++++- cli/config.go | 232 +++++++++++++++++-------- cli/config_test.go | 20 ++- cli/daemon.go | 35 ++-- cli/docker-labels.go | 46 +---- cli/docker_config_handler.go | 105 +++++++++++ cli/validate.go | 23 +-- core/{job.go => bare_job.go} | 23 ++- core/{job_test.go => bare_job_test.go} | 0 core/common.go | 31 ++++ core/common_test.go | 1 - core/cron_utils.go | 18 ++ core/execjob.go | 6 +- core/scheduler.go | 32 ++-- core/scheduler_test.go | 2 - docs/jobs.md | 21 +-- go.mod | 2 +- go.sum | 4 +- ofelia.go | 17 +- test/docker-compose.yml | 17 -- 21 files changed, 451 insertions(+), 232 deletions(-) create mode 100644 cli/docker_config_handler.go rename core/{job.go => bare_job.go} (55%) rename core/{job_test.go => bare_job_test.go} (100%) create mode 100644 core/cron_utils.go delete mode 100644 test/docker-compose.yml diff --git a/Dockerfile b/Dockerfile index 29b125272..1ffd2f77f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.17.1-alpine AS builder +FROM golang:1.18.1-alpine3.15 AS builder RUN apk --no-cache add gcc musl-dev @@ -7,7 +7,7 @@ COPY . ${GOPATH}/src/github.com/mcuadros/ofelia RUN go build -o /go/bin/ofelia . -FROM alpine:3.14.2 +FROM alpine:3.16.0 # this label is required to identify container with ofelia running LABEL ofelia.service=true diff --git a/README.md b/README.md index 205d4b260..fbea5b178 100644 --- a/README.md +++ b/README.md @@ -65,8 +65,6 @@ In order to use this type of configurations, ofelia need access to docker socket ```sh docker run -it --rm \ -v /var/run/docker.sock:/var/run/docker.sock:ro \ - --label ofelia.job-local.my-test-job.schedule="@every 5s" \ - --label ofelia.job-local.my-test-job.command="date" \ mcuadros/ofelia:latest daemon --docker ``` @@ -86,9 +84,8 @@ docker run -it --rm \ nginx ``` -Now if we start `ofelia` container with the command provided above, it will pickup 2 jobs: +Now if we start `ofelia` container with the command provided above, it will execute the task: -- Local - `date` - Exec - `uname -a` Or with docker-compose: @@ -103,9 +100,6 @@ services: command: daemon --docker volumes: - /var/run/docker.sock:/var/run/docker.sock:ro - labels: - ofelia.job-local.my-test-job.schedule: "@every 5s" - ofelia.job-local.my-test-job.command: "date" nginx: image: nginx @@ -115,6 +109,42 @@ services: ofelia.job-exec.datecron.command: "uname -a" ``` +#### Dynamic docker configuration + +You can start ofelia in its own container or on the host itself, and it will magically pick up any container that starts, stops or is modified on the fly. +In order to achieve this, you simply have to use docker containers with the labels described above and let ofelia take care of the rest. + +#### Hybrid configuration (INI files + Docker) + +You can specify part of the configuration on the INI files, such as globals for the middlewares or even declare tasks in there but also merge them with docker. +The docker labels will be parsed, added and removed on the fly but also, the file config can be used. + +**Use the INI file to:** + +- Configure the slack or other middleware integration +- Configure any global setting +- Create a job-run so it executes on a new container each time + +```ini +[global] +slack-webhook = https://myhook.com/auth + +[job-run "job-executed-on-new-container"] +schedule = @hourly +image = ubuntu:latest +command = touch /tmp/example +``` + +**Use docker to:** + +```sh +docker run -it --rm \ + --label ofelia.enabled=true \ + --label ofelia.job-exec.test-exec-job.schedule="@every 5s" \ + --label ofelia.job-exec.test-exec-job.command="uname -a" \ + nginx +``` + ### Logging **Ofelia** comes with three different logging drivers that can be configured in the `[global]` section: - `mail` to send mails diff --git a/cli/config.go b/cli/config.go index c60bb2b91..c4be8741b 100644 --- a/cli/config.go +++ b/cli/config.go @@ -1,12 +1,8 @@ package cli import ( - "os" - - docker "github.com/fsouza/go-dockerclient" "github.com/mcuadros/ofelia/core" "github.com/mcuadros/ofelia/middlewares" - logging "github.com/op/go-logging" defaults "github.com/mcuadros/go-defaults" gcfg "gopkg.in/gcfg.v1" @@ -20,8 +16,6 @@ const ( jobLocal = "job-local" ) -var IsDockerEnv bool - // Config contains the configuration type Config struct { Global struct { @@ -29,123 +23,215 @@ type Config struct { middlewares.SaveConfig `mapstructure:",squash"` middlewares.MailConfig `mapstructure:",squash"` } - ExecJobs map[string]*ExecJobConfig `gcfg:"job-exec" mapstructure:"job-exec,squash"` - RunJobs map[string]*RunJobConfig `gcfg:"job-run" mapstructure:"job-run,squash"` - ServiceJobs map[string]*RunServiceConfig `gcfg:"job-service-run" mapstructure:"job-service-run,squash"` - LocalJobs map[string]*LocalJobConfig `gcfg:"job-local" mapstructure:"job-local,squash"` + ExecJobs map[string]*ExecJobConfig `gcfg:"job-exec" mapstructure:"job-exec,squash"` + RunJobs map[string]*RunJobConfig `gcfg:"job-run" mapstructure:"job-run,squash"` + ServiceJobs map[string]*RunServiceConfig `gcfg:"job-service-run" mapstructure:"job-service-run,squash"` + LocalJobs map[string]*LocalJobConfig `gcfg:"job-local" mapstructure:"job-local,squash"` + sh *core.Scheduler + dockerHandler *DockerHandler + logger core.Logger } -// BuildFromDockerLabels builds a scheduler using the config from a docker labels -func BuildFromDockerLabels() (*core.Scheduler, error) { +func NewConfig(logger core.Logger) *Config { + // Initialize c := &Config{} - - d, err := c.buildDockerClient() - if err != nil { - return nil, err - } - - labels, err := getLabels(d) - if err != nil { - return nil, err - } - - if err := c.buildFromDockerLabels(labels); err != nil { - return nil, err - } - - return c.build() + c.ExecJobs = make(map[string]*ExecJobConfig) + c.RunJobs = make(map[string]*RunJobConfig) + c.ServiceJobs = make(map[string]*RunServiceConfig) + c.LocalJobs = make(map[string]*LocalJobConfig) + c.logger = logger + defaults.SetDefaults(c) + return c } // BuildFromFile builds a scheduler using the config from a file -func BuildFromFile(filename string) (*core.Scheduler, error) { - c := &Config{} - if err := gcfg.ReadFileInto(c, filename); err != nil { - return nil, err - } - - return c.build() +func BuildFromFile(filename string, logger core.Logger) (*Config, error) { + c := NewConfig(logger) + err := gcfg.ReadFileInto(c, filename) + return c, err } // BuildFromString builds a scheduler using the config from a string -func BuildFromString(config string) (*core.Scheduler, error) { - c := &Config{} +func BuildFromString(config string, logger core.Logger) (*Config, error) { + c := NewConfig(logger) if err := gcfg.ReadStringInto(c, config); err != nil { return nil, err } - - return c.build() + return c, nil } -func (c *Config) build() (*core.Scheduler, error) { - defaults.SetDefaults(c) +// Call this only once at app init +func (c *Config) InitializeApp() error { + c.sh = core.NewScheduler(c.logger) + c.buildSchedulerMiddlewares(c.sh) - d, err := c.buildDockerClient() + var err error + c.dockerHandler, err = NewDockerHandler(c, c.logger) if err != nil { - return nil, err + return err } - sh := core.NewScheduler(c.buildLogger()) - c.buildSchedulerMiddlewares(sh) + // In order to support non dynamic job types such as Local or Run using labels + // lets parse the labels and merge the job lists + dockerLabels, err := c.dockerHandler.GetDockerLabels() + var parsedLabelConfig Config + parsedLabelConfig.buildFromDockerLabels(dockerLabels) + for name, j := range parsedLabelConfig.RunJobs { + c.RunJobs[name] = j + } + for name, j := range parsedLabelConfig.LocalJobs { + c.LocalJobs[name] = j + } + for name, j := range parsedLabelConfig.ServiceJobs { + c.ServiceJobs[name] = j + } for name, j := range c.ExecJobs { defaults.SetDefaults(j) - - j.Client = d + j.Client = c.dockerHandler.GetInternalDockerClient() j.Name = name j.buildMiddlewares() - sh.AddJob(j) + c.sh.AddJob(j) } for name, j := range c.RunJobs { defaults.SetDefaults(j) - - j.Client = d + j.Client = c.dockerHandler.GetInternalDockerClient() j.Name = name j.buildMiddlewares() - sh.AddJob(j) + c.sh.AddJob(j) } for name, j := range c.LocalJobs { defaults.SetDefaults(j) - j.Name = name j.buildMiddlewares() - sh.AddJob(j) + c.sh.AddJob(j) } for name, j := range c.ServiceJobs { defaults.SetDefaults(j) j.Name = name - j.Client = d + j.Client = c.dockerHandler.GetInternalDockerClient() j.buildMiddlewares() - sh.AddJob(j) + c.sh.AddJob(j) } - return sh, nil + return nil } -func (c *Config) buildDockerClient() (*docker.Client, error) { - d, err := docker.NewClientFromEnv() - if err != nil { - return nil, err +func (c *Config) buildSchedulerMiddlewares(sh *core.Scheduler) { + sh.Use(middlewares.NewSlack(&c.Global.SlackConfig)) + sh.Use(middlewares.NewSave(&c.Global.SaveConfig)) + sh.Use(middlewares.NewMail(&c.Global.MailConfig)) +} + +func (c *Config) dockerLabelsUpdate(labels map[string]map[string]string) { + // Get the current labels + var parsedLabelConfig Config + parsedLabelConfig.buildFromDockerLabels(labels) + + // Calculate the delta execJobs + for name, j := range c.ExecJobs { + found := false + for newJobsName, newJob := range parsedLabelConfig.ExecJobs { + // Check if the schedule has changed + if name == newJobsName { + found = true + // There is a slight race condition were a job can be canceled / restarted with different params + // so, lets take care of it by simply restarting + // For the hash to work properly, we must fill the fields before calling it + defaults.SetDefaults(newJob) + newJob.Client = c.dockerHandler.GetInternalDockerClient() + newJob.Name = newJobsName + if newJob.Hash() != j.Hash() { + // Remove from the scheduler + c.sh.RemoveJob(j) + // Add the job back to the scheduler + newJob.buildMiddlewares() + c.sh.AddJob(newJob) + // Update the job config + c.ExecJobs[name] = newJob + } + break + } + } + if !found { + // Remove the job + c.sh.RemoveJob(j) + delete(c.ExecJobs, name) + } } - return d, nil -} + // Check for aditions + for newJobsName, newJob := range parsedLabelConfig.ExecJobs { + found := false + for name := range c.ExecJobs { + if name == newJobsName { + found = true + break + } + } + if !found { + defaults.SetDefaults(newJob) + newJob.Client = c.dockerHandler.GetInternalDockerClient() + newJob.Name = newJobsName + newJob.buildMiddlewares() + c.sh.AddJob(newJob) + c.ExecJobs[newJobsName] = newJob + } + } -func (c *Config) buildLogger() core.Logger { - stdout := logging.NewLogBackend(os.Stdout, "", 0) - // Set the backends to be used. - logging.SetBackend(stdout) - logging.SetFormatter(logging.MustStringFormatter(logFormat)) + for name, j := range c.RunJobs { + found := false + for newJobsName, newJob := range parsedLabelConfig.RunJobs { + // Check if the schedule has changed + if name == newJobsName { + found = true + // There is a slight race condition were a job can be canceled / restarted with different params + // so, lets take care of it by simply restarting + // For the hash to work properly, we must fill the fields before calling it + defaults.SetDefaults(newJob) + newJob.Client = c.dockerHandler.GetInternalDockerClient() + newJob.Name = newJobsName + if newJob.Hash() != j.Hash() { + // Remove from the scheduler + c.sh.RemoveJob(j) + // Add the job back to the scheduler + newJob.buildMiddlewares() + c.sh.AddJob(newJob) + // Update the job config + c.RunJobs[name] = newJob + } + break + } + } + if !found { + // Remove the job + c.sh.RemoveJob(j) + delete(c.RunJobs, name) + } + } - return logging.MustGetLogger("ofelia") -} + // Check for aditions + for newJobsName, newJob := range parsedLabelConfig.RunJobs { + found := false + for name := range c.RunJobs { + if name == newJobsName { + found = true + break + } + } + if !found { + defaults.SetDefaults(newJob) + newJob.Client = c.dockerHandler.GetInternalDockerClient() + newJob.Name = newJobsName + newJob.buildMiddlewares() + c.sh.AddJob(newJob) + c.RunJobs[newJobsName] = newJob + } + } -func (c *Config) buildSchedulerMiddlewares(sh *core.Scheduler) { - sh.Use(middlewares.NewSlack(&c.Global.SlackConfig)) - sh.Use(middlewares.NewSave(&c.Global.SaveConfig)) - sh.Use(middlewares.NewMail(&c.Global.MailConfig)) } // ExecJobConfig contains all configuration params needed to build a ExecJob diff --git a/cli/config_test.go b/cli/config_test.go index 8c42818ba..28f6b10f0 100644 --- a/cli/config_test.go +++ b/cli/config_test.go @@ -15,8 +15,17 @@ type SuiteConfig struct{} var _ = Suite(&SuiteConfig{}) +type TestLogger struct{} + +func (*TestLogger) Criticalf(format string, args ...interface{}) {} +func (*TestLogger) Debugf(format string, args ...interface{}) {} +func (*TestLogger) Errorf(format string, args ...interface{}) {} +func (*TestLogger) Noticef(format string, args ...interface{}) {} +func (*TestLogger) Warningf(format string, args ...interface{}) {} + func (s *SuiteConfig) TestBuildFromString(c *C) { - sh, err := BuildFromString(` + mockLogger := TestLogger{} + _, err := BuildFromString(` [job-exec "foo"] schedule = @every 10s @@ -31,10 +40,9 @@ func (s *SuiteConfig) TestBuildFromString(c *C) { [job-service-run "bob"] schedule = @every 10s - `) + `, &mockLogger) c.Assert(err, IsNil) - c.Assert(sh.Jobs, HasLen, 5) } func (s *SuiteConfig) TestJobDefaultsSet(c *C) { @@ -122,6 +130,7 @@ func (s *SuiteConfig) TestLabelsConfig(c *C) { ExpectedConfig: Config{}, Comment: "No service label, no 'local' jobs", }, + { Labels: map[string]map[string]string{ "some": map[string]string{ @@ -156,6 +165,10 @@ func (s *SuiteConfig) TestLabelsConfig(c *C) { Schedule: "schedule2", Command: "command2", }}}, + "job5": &RunJobConfig{RunJob: core.RunJob{BareJob: core.BareJob{ + Schedule: "schedule5", + Command: "command5", + }}}, }, ServiceJobs: map[string]*RunServiceConfig{ "job3": &RunServiceConfig{RunServiceJob: core.RunServiceJob{BareJob: core.BareJob{ @@ -166,6 +179,7 @@ func (s *SuiteConfig) TestLabelsConfig(c *C) { }, Comment: "Local/Run/Service jobs from non-service container ignored", }, + { Labels: map[string]map[string]string{ "some": map[string]string{ diff --git a/cli/daemon.go b/cli/daemon.go index 491488dfa..cbf4a1825 100644 --- a/cli/daemon.go +++ b/cli/daemon.go @@ -10,20 +10,15 @@ import ( // DaemonCommand daemon process type DaemonCommand struct { - ConfigFile string `long:"config" description:"configuration file" default:"/etc/ofelia.conf"` - DockerLabelsConfig bool `short:"d" long:"docker" description:"read configurations from docker labels"` - - config *Config - scheduler *core.Scheduler - signals chan os.Signal - done chan bool + ConfigFile string `long:"config" description:"configuration file" default:"/etc/ofelia.conf"` + scheduler *core.Scheduler + signals chan os.Signal + done chan bool + Logger core.Logger } // Execute runs the daemon func (c *DaemonCommand) Execute(args []string) error { - _, err := os.Stat("/.dockerenv") - IsDockerEnv = !os.IsNotExist(err) - if err := c.boot(); err != nil { return err } @@ -40,13 +35,19 @@ func (c *DaemonCommand) Execute(args []string) error { } func (c *DaemonCommand) boot() (err error) { - if c.DockerLabelsConfig { - c.scheduler, err = BuildFromDockerLabels() - } else { - c.scheduler, err = BuildFromFile(c.ConfigFile) + // Always try to read the config file, as there are options such as globals or some tasks that can be specified there and not in docker + config, err := BuildFromFile(c.ConfigFile, c.Logger) + if err != nil { + c.Logger.Debugf("Config file: %v not found", c.ConfigFile) + } + + err = config.InitializeApp() + if err != nil { + c.Logger.Criticalf("Can't start the app: %v", err) } + c.scheduler = config.sh - return + return err } func (c *DaemonCommand) start() error { @@ -66,7 +67,7 @@ func (c *DaemonCommand) setSignals() { go func() { sig := <-c.signals - c.scheduler.Logger.Warningf( + c.Logger.Warningf( "Signal received: %s, shutting down the process\n", sig, ) @@ -80,6 +81,6 @@ func (c *DaemonCommand) shutdown() error { return nil } - c.scheduler.Logger.Warningf("Waiting running jobs.") + c.Logger.Warningf("Waiting running jobs.") return c.scheduler.Stop() } diff --git a/cli/docker-labels.go b/cli/docker-labels.go index 9ba796ebe..ea2eb71fc 100644 --- a/cli/docker-labels.go +++ b/cli/docker-labels.go @@ -2,11 +2,8 @@ package cli import ( "encoding/json" - "errors" "strings" - "time" - docker "github.com/fsouza/go-dockerclient" "github.com/mitchellh/mapstructure" ) @@ -18,47 +15,6 @@ const ( serviceLabel = labelPrefix + ".service" ) -func getLabels(d *docker.Client) (map[string]map[string]string, error) { - // sleep before querying containers - // because docker not always propagating labels in time - // so ofelia app can't find it's own container - if IsDockerEnv { - time.Sleep(1 * time.Second) - } - - conts, err := d.ListContainers(docker.ListContainersOptions{ - Filters: map[string][]string{ - "label": {requiredLabelFilter}, - }, - }) - if err != nil { - return nil, err - } - - if len(conts) == 0 { - return nil, errors.New("Couldn't find containers with label 'ofelia.enabled=true'") - } - - var labels = make(map[string]map[string]string) - - for _, c := range conts { - if len(c.Names) > 0 && len(c.Labels) > 0 { - name := strings.TrimPrefix(c.Names[0], "/") - for k := range c.Labels { - // remove all not relevant labels - if !strings.HasPrefix(k, labelPrefix) { - delete(c.Labels, k) - continue - } - } - - labels[name] = c.Labels - } - } - - return labels, nil -} - func (c *Config) buildFromDockerLabels(labels map[string]map[string]string) error { execJobs := make(map[string]map[string]interface{}) localJobs := make(map[string]map[string]interface{}) @@ -109,7 +65,7 @@ func (c *Config) buildFromDockerLabels(labels map[string]map[string]string) erro serviceJobs[jobName] = make(map[string]interface{}) } setJobParam(serviceJobs[jobName], jopParam, v) - case jobType == jobRun && isServiceContainer: + case jobType == jobRun: if _, ok := runJobs[jobName]; !ok { runJobs[jobName] = make(map[string]interface{}) } diff --git a/cli/docker_config_handler.go b/cli/docker_config_handler.go new file mode 100644 index 000000000..62e8cb30f --- /dev/null +++ b/cli/docker_config_handler.go @@ -0,0 +1,105 @@ +package cli + +import ( + "errors" + "strings" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/mcuadros/ofelia/core" +) + +var ErrNoContainerWithOfeliaEnabled = errors.New("Couldn't find containers with label 'ofelia.enabled=true'") + +type DockerHandler struct { + dockerClient *docker.Client + notifier dockerLabelsUpdate + logger core.Logger +} + +type dockerLabelsUpdate interface { + dockerLabelsUpdate(map[string]map[string]string) +} + +// TODO: Implement an interface so the code does not have to use third parties directly +func (c *DockerHandler) GetInternalDockerClient() *docker.Client { + return c.dockerClient +} + +func (c *DockerHandler) buildDockerClient() (*docker.Client, error) { + d, err := docker.NewClientFromEnv() + if err != nil { + return nil, err + } + + return d, nil +} + +func NewDockerHandler(notifier dockerLabelsUpdate, logger core.Logger) (*DockerHandler, error) { + c := &DockerHandler{} + var err error + c.dockerClient, err = c.buildDockerClient() + c.notifier = notifier + c.logger = logger + if err != nil { + return nil, err + } + // Do a sanity check on docker + _, err = c.dockerClient.Info() + if err != nil { + return nil, err + } + + go c.watch() + return c, nil +} + +func (c *DockerHandler) watch() { + // Poll for changes + tick := time.Tick(10000 * time.Millisecond) + for { + select { + case <-tick: + labels, err := c.GetDockerLabels() + // Do not print or care if there is no container up right now + if err != nil && !errors.Is(err, ErrNoContainerWithOfeliaEnabled) { + c.logger.Debugf("%v", err) + } + c.notifier.dockerLabelsUpdate(labels) + } + } +} + +func (c *DockerHandler) GetDockerLabels() (map[string]map[string]string, error) { + conts, err := c.dockerClient.ListContainers(docker.ListContainersOptions{ + Filters: map[string][]string{ + "label": {requiredLabelFilter}, + }, + }) + if err != nil { + return nil, err + } + + if len(conts) == 0 { + return nil, ErrNoContainerWithOfeliaEnabled + } + + var labels = make(map[string]map[string]string) + + for _, c := range conts { + if len(c.Names) > 0 && len(c.Labels) > 0 { + name := strings.TrimPrefix(c.Names[0], "/") + for k := range c.Labels { + // remove all not relevant labels + if !strings.HasPrefix(k, labelPrefix) { + delete(c.Labels, k) + continue + } + } + + labels[name] = c.Labels + } + } + + return labels, nil +} diff --git a/cli/validate.go b/cli/validate.go index 85e4660a1..9dcd0b654 100644 --- a/cli/validate.go +++ b/cli/validate.go @@ -1,30 +1,23 @@ package cli -import "fmt" +import ( + "github.com/mcuadros/ofelia/core" +) // ValidateCommand validates the config file type ValidateCommand struct { ConfigFile string `long:"config" description:"configuration file" default:"/etc/ofelia.conf"` + Logger core.Logger } // Execute runs the validation command func (c *ValidateCommand) Execute(args []string) error { - fmt.Printf("Validating %q ... ", c.ConfigFile) - config, err := BuildFromFile(c.ConfigFile) + c.Logger.Debugf("Validating %q ... ", c.ConfigFile) + _, err := BuildFromFile(c.ConfigFile, c.Logger) if err != nil { - fmt.Println("ERROR") + c.Logger.Errorf("ERROR") return err } - - fmt.Println("OK") - fmt.Printf("Found %d jobs:\n", len(config.Jobs)) - - for _, j := range config.Jobs { - fmt.Printf( - "- name: %s schedule: %q command: %q\n", - j.GetName(), j.GetSchedule(), j.GetCommand(), - ) - } - + c.Logger.Debugf("OK") return nil } diff --git a/core/job.go b/core/bare_job.go similarity index 55% rename from core/job.go rename to core/bare_job.go index 90dcc8fd2..b640d5280 100644 --- a/core/job.go +++ b/core/bare_job.go @@ -1,19 +1,21 @@ package core import ( + "reflect" "sync" "sync/atomic" ) type BareJob struct { - Schedule string - Name string - Command string + Schedule string `hash:"true"` + Name string `hash:"true"` + Command string `hash:"true"` middlewareContainer running int32 lock sync.Mutex history []*Execution + cronID int } func (j *BareJob) GetName() string { @@ -39,3 +41,18 @@ func (j *BareJob) NotifyStart() { func (j *BareJob) NotifyStop() { atomic.AddInt32(&j.running, -1) } + +func (j *BareJob) GetCronJobID() int { + return j.cronID +} + +func (j *BareJob) SetCronJobID(id int) { + j.cronID = id +} + +// Returns a hash of all the job attributes. Used to detect changes +func (j *BareJob) Hash() string { + var hash string + getHash(reflect.TypeOf(j).Elem(), reflect.ValueOf(j).Elem(), &hash) + return hash +} diff --git a/core/job_test.go b/core/bare_job_test.go similarity index 100% rename from core/job_test.go rename to core/bare_job_test.go diff --git a/core/common.go b/core/common.go index f9fc7f1c3..11f9f634f 100644 --- a/core/common.go +++ b/core/common.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "reflect" + "strconv" "strings" "time" @@ -37,6 +38,8 @@ type Job interface { Running() int32 NotifyStart() NotifyStop() + GetCronJobID() int + SetCronJobID(int) } type Context struct { @@ -299,3 +302,31 @@ func buildAuthConfiguration(registry string) docker.AuthConfiguration { return auth } + +const HashmeTagName = "hash" + +func getHash(t reflect.Type, v reflect.Value, hash *string) { + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + fieldv := v.Field(i) + kind := field.Type.Kind() + + if kind == reflect.Struct { + getHash(field.Type, fieldv, hash) + continue + } + + hashmeTag := field.Tag.Get(HashmeTagName) + if hashmeTag == "true" { + if kind == reflect.String { + *hash += fieldv.String() + } else if kind == reflect.Int32 || kind == reflect.Int || kind == reflect.Int64 || kind == reflect.Int16 || kind == reflect.Int8 { + *hash += strconv.FormatInt(fieldv.Int(), 10) + } else if kind == reflect.Bool { + *hash += strconv.FormatBool(fieldv.Bool()) + } else { + panic("Unsupported field type") + } + } + } +} diff --git a/core/common_test.go b/core/common_test.go index d7430e5bd..11d4015b6 100644 --- a/core/common_test.go +++ b/core/common_test.go @@ -227,7 +227,6 @@ func (s *SuiteCommon) TestExecutionStopErrorSkip(c *C) { c.Assert(exe.Failed, Equals, false) c.Assert(exe.Skipped, Equals, true) c.Assert(exe.Error, Equals, nil) - c.Assert(exe.Duration.Seconds() > .0, Equals, true) } func (s *SuiteCommon) TestMiddlewareContainerUseTwice(c *C) { diff --git a/core/cron_utils.go b/core/cron_utils.go new file mode 100644 index 000000000..88b52d786 --- /dev/null +++ b/core/cron_utils.go @@ -0,0 +1,18 @@ +package core + +// Implement the cron logger interface +type CronUtils struct { + Logger Logger +} + +func NewCronUtils(l Logger) *CronUtils { + return &CronUtils{Logger: l} +} + +func (c *CronUtils) Info(msg string, keysAndValues ...interface{}) { + c.Logger.Debugf(msg) // TODO, pass in the keysAndValues +} + +func (c *CronUtils) Error(err error, msg string, keysAndValues ...interface{}) { + c.Logger.Errorf("msg: %v, error: %v", msg, err) // TODO, pass in the keysAndValues +} diff --git a/core/execjob.go b/core/execjob.go index 9bea17bcc..901df2818 100644 --- a/core/execjob.go +++ b/core/execjob.go @@ -10,9 +10,9 @@ import ( type ExecJob struct { BareJob `mapstructure:",squash"` Client *docker.Client `json:"-"` - Container string - User string `default:"root"` - TTY bool `default:"false"` + Container string `hash:"true"` + User string `default:"root" hash:"true"` + TTY bool `default:"false" hash:"true"` Environment []string execID string diff --git a/core/scheduler.go b/core/scheduler.go index 7733889df..e2fbe9963 100644 --- a/core/scheduler.go +++ b/core/scheduler.go @@ -5,7 +5,7 @@ import ( "fmt" "sync" - "github.com/robfig/cron" + "github.com/robfig/cron/v3" ) var ( @@ -24,47 +24,41 @@ type Scheduler struct { } func NewScheduler(l Logger) *Scheduler { + cronUtils := NewCronUtils(l) return &Scheduler{ Logger: l, - cron: cron.New(), + cron: cron.New(cron.WithLogger(cronUtils), cron.WithChain(cron.Recover(cronUtils))), } } func (s *Scheduler) AddJob(j Job) error { - s.Logger.Noticef("New job registered %q - %q - %q", j.GetName(), j.GetCommand(), j.GetSchedule()) - if j.GetSchedule() == "" { return ErrEmptySchedule } - err := s.cron.AddJob(j.GetSchedule(), &jobWrapper{s, j}) + id, err := s.cron.AddJob(j.GetSchedule(), &jobWrapper{s, j}) if err != nil { return err } + j.SetCronJobID(int(id)) // Cast to int in order to avoid pushing cron external to common + j.Use(s.Middlewares()...) + s.Logger.Noticef("New job registered %q - %q - %q - ID: %v", j.GetName(), j.GetCommand(), j.GetSchedule(), id) + return nil +} - s.Jobs = append(s.Jobs, j) +func (s *Scheduler) RemoveJob(j Job) error { + s.Logger.Noticef("Job deregistered (will not fire again) %q - %q - %q - ID: %v", j.GetName(), j.GetCommand(), j.GetSchedule(), j.GetCronJobID()) + s.cron.Remove(cron.EntryID(j.GetCronJobID())) return nil } func (s *Scheduler) Start() error { - if len(s.Jobs) == 0 { - return ErrEmptyScheduler - } - - s.Logger.Debugf("Starting scheduler with %d jobs", len(s.Jobs)) - - s.mergeMiddlewares() + s.Logger.Debugf("Starting scheduler") s.isRunning = true s.cron.Start() return nil } -func (s *Scheduler) mergeMiddlewares() { - for _, j := range s.Jobs { - j.Use(s.Middlewares()...) - } -} - func (s *Scheduler) Stop() error { s.wg.Wait() s.cron.Stop() diff --git a/core/scheduler_test.go b/core/scheduler_test.go index 31d957b3b..5be8b37b2 100644 --- a/core/scheduler_test.go +++ b/core/scheduler_test.go @@ -17,7 +17,6 @@ func (s *SuiteScheduler) TestAddJob(c *C) { sc := NewScheduler(&TestLogger{}) err := sc.AddJob(job) c.Assert(err, IsNil) - c.Assert(sc.Jobs, HasLen, 1) e := sc.cron.Entries() c.Assert(e, HasLen, 1) @@ -51,7 +50,6 @@ func (s *SuiteScheduler) TestMergeMiddlewaresSame(c *C) { sc := NewScheduler(&TestLogger{}) sc.Use(mA) sc.AddJob(job) - sc.mergeMiddlewares() m := job.Middlewares() c.Assert(m, HasLen, 1) diff --git a/docs/jobs.md b/docs/jobs.md index d4bf0bd3f..56e53c1bc 100644 --- a/docs/jobs.md +++ b/docs/jobs.md @@ -51,8 +51,6 @@ tty = false ### Docker labels example -`ofelia` container should be started **after** nginx container, to be able to read its labels, because real time labels reading is not supported yet. - ```sh docker run -it --rm \ --label ofelia.enabled=true \ @@ -130,9 +128,7 @@ environment = FOO=bar Then you can check output in host machine file `/tmp/test/date` -### Docker labels example - -Docker run job has to be configured as labels on the `ofelia` container itself, because it is going to start new container: +### Running ofelia on Docker example ```sh docker run -it --rm \ @@ -182,21 +178,6 @@ command = touch test.txt dir = /tmp/ ``` -### Docker labels example - -Docker run job has to be configured as labels on the `ofelia` container itself, because it will be executed inside `ofelia` container - -```sh -docker run -it --rm \ - -v /var/run/docker.sock:/var/run/docker.sock:ro \ - --label ofelia.enabled=true \ - --label ofelia.job-local.create-file.schedule="@every 15s" \ - --label ofelia.job-local.create-file.image="alpine:latest" \ - --label ofelia.job-local.create-file.command="touch test.txt" \ - --label ofelia.job-local.create-file.dir="/tmp/" \ - mcuadros/ofelia:latest daemon --docker -``` - ## Job-service-run This job can be used to: diff --git a/go.mod b/go.mod index aed9a23d5..5650c9f14 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/mcuadros/go-defaults v1.2.0 github.com/mitchellh/mapstructure v1.4.2 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 - github.com/robfig/cron v1.2.0 + github.com/robfig/cron/v3 v3.0.1 go.opencensus.io v0.23.0 // indirect golang.org/x/sys v0.0.0-20211002104244-808efd93c36d // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect diff --git a/go.sum b/go.sum index 7134bd257..0117e5f26 100644 --- a/go.sum +++ b/go.sum @@ -546,8 +546,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= -github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/ofelia.go b/ofelia.go index 731ae9a41..4a71d6cb2 100644 --- a/ofelia.go +++ b/ofelia.go @@ -6,15 +6,28 @@ import ( "github.com/jessevdk/go-flags" "github.com/mcuadros/ofelia/cli" + "github.com/mcuadros/ofelia/core" + "github.com/op/go-logging" ) var version string var build string +const logFormat = "%{time} %{color} %{shortfile} ▶ %{level} %{color:reset} %{message}" + +func buildLogger() core.Logger { + stdout := logging.NewLogBackend(os.Stdout, "", 0) + // Set the backends to be used. + logging.SetBackend(stdout) + logging.SetFormatter(logging.MustStringFormatter(logFormat)) + return logging.MustGetLogger("ofelia") +} + func main() { + logger := buildLogger() parser := flags.NewNamedParser("ofelia", flags.Default) - parser.AddCommand("daemon", "daemon process", "", &cli.DaemonCommand{}) - parser.AddCommand("validate", "validates the config file", "", &cli.ValidateCommand{}) + parser.AddCommand("daemon", "daemon process", "", &cli.DaemonCommand{Logger: logger}) + parser.AddCommand("validate", "validates the config file", "", &cli.ValidateCommand{Logger: logger}) if _, err := parser.Parse(); err != nil { if _, ok := err.(*flags.Error); ok { diff --git a/test/docker-compose.yml b/test/docker-compose.yml deleted file mode 100644 index 57043594a..000000000 --- a/test/docker-compose.yml +++ /dev/null @@ -1,17 +0,0 @@ -version: "3.3" -services: - zookeeper: - image: znly/zookeeper:3.4.8 - labels: - - "deployment.name=mesos" - - "deployment.taskname=zookeeper" - volumes: - - zookeeper-data:/data/zookeeper - environment: - - ZOO_ID=1 - - ZOO_SERVERS=zookeeper - -volumes: - zookeeper-data: - -