diff --git a/assets/filebeat/filebeat.tpl b/assets/filebeat/filebeat.tpl index 850827ab..97e279bd 100644 --- a/assets/filebeat/filebeat.tpl +++ b/assets/filebeat/filebeat.tpl @@ -2,7 +2,7 @@ - type: log enabled: true paths: - - {{ .LogFile }} + - {{ .LogFile }} scan_frequency: 10s fields_under_root: true {{if .Stdout}} @@ -12,10 +12,10 @@ cri_flags: true {{end}} fields: - cluster: ${CLUSTER_ID} - {{- range $key, $value := .Tags }} - {{ $key }}: "{{ $value }}" - {{- end }} + cluster: ${CLUSTER_ID} + {{- range $key, $value := .Tags }} + {{ $key }}: "{{ $value }}" + {{- end }} tail_files: false # Harvester closing options close_eof: false @@ -26,5 +26,11 @@ # State options clean_removed: true clean_inactive: 72h + # If an ES receiver failed to index a message, Filebeat will output an error message matching the format below; + # this error message could be gathered again and thus form a loop, and likely consume a lot of resources; + # therefore, we exclude this kind of error message when gathering logs from a Filebeat container + {{if $.isFilebeat}} + exclude_lines: ['^[0-9]{4}-[0-9]{1,2}-[0-9]{1,2}T.+WARN.+elasticsearch/client.+Cannot index event publisher.+'] + {{end}} {{- end}} diff --git a/cmd/log-pilot/main.go b/cmd/log-pilot/main.go index 91d2ad51..21c1411d 100644 --- a/cmd/log-pilot/main.go +++ b/cmd/log-pilot/main.go @@ -15,17 +15,20 @@ import ( ) var ( - template = flag.String("path.template", "", "Template file path for filebeat") - filebeatHome = flag.String("path.filebeat-home", "", "Filebeat home path") - base = flag.String("path.base", "/", "Directory which mount host path") - logPath = flag.String("path.logs", "", "Logs path") - logPrefix = flag.String("logPrefix", "caicloud", "Log prefix of the env parameters. Multiple prefixes should be separated by \",\"") - logLevel = flag.String("logLevel", "info", "Log level: debug, info, warning, error, critical") - wListNS = flag.String("namespace.whitelist", "", "whitelist of namespaces to watch") - bListNS = flag.String("namespace.blacklist", "", "blacklist of namespaces to ignore") - logMaxBytes = flag.Uint("log.maxSize", 10*1024*1024, "Max size of log file in bytes") - logMaxBackups = flag.Uint("log.maxBackups", 7, "Max backups of log files") - logToStderr = flag.Bool("e", false, "Log to stderr") + template = flag.String("path.template", "", "Template file path for filebeat") + filebeatHome = flag.String("path.filebeat-home", "", "Filebeat home path") + base = flag.String("path.base", "/", "Directory which mount host path") + logPath = flag.String("path.logs", "", "Logs path") + logPrefix = flag.String("logPrefix", "caicloud", "Log prefix of the env parameters. Multiple prefixes should be separated by \",\"") + logLevel = flag.String("logLevel", "info", "Log level: debug, info, warning, error, critical") + wListNS = flag.String("namespace.whitelist", "", "whitelist of namespaces to watch") + bListNS = flag.String("namespace.blacklist", "", "blacklist of namespaces to ignore") + logMaxBytes = flag.Uint("log.maxSize", 10*1024*1024, "Max size of log file in bytes") + logMaxBackups = flag.Uint("log.maxBackups", 7, "Max backups of log files") + logToStderr = flag.Bool("e", false, "Log to stderr") + filebeatNamespace = flag.String("filebeat.namespace", "kube-system", "namespace where Filebeat is deployed") + filebeatPodRegex = flag.String("filebeat.podRegex", "^logging-filebeat.+", "regex with what to match the name of the Filebeat Pod") + filebeatContainer = flag.String("filebeat.container", "c0", "name of the Filebeat container") ) func main() { @@ -38,7 +41,7 @@ func main() { log.Fatal("Invalid path.base:", err) } var cfgr configurer.Configurer - cfgr, err = filebeat.New(baseDir, *template, *filebeatHome) + cfgr, err = filebeat.New(baseDir, *template, *filebeatHome, *filebeatNamespace, *filebeatPodRegex, *filebeatContainer) if err != nil { log.Fatalf("Error create configurer: %v", err) } diff --git a/pilot/configurer/filebeat/filebeat.go b/pilot/configurer/filebeat/filebeat.go index 3d79a2d2..7301aec5 100644 --- a/pilot/configurer/filebeat/filebeat.go +++ b/pilot/configurer/filebeat/filebeat.go @@ -12,14 +12,14 @@ import ( "sync" "text/template" "time" + "regexp" - "github.com/caicloud/log-pilot/pilot/container" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/go-ucfg" + "github.com/caicloud/log-pilot/pilot/container" "github.com/caicloud/log-pilot/pilot/configurer" "github.com/caicloud/log-pilot/pilot/log" - - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/go-ucfg" ) // logStates contains states in filebeat registry and related to the container @@ -33,17 +33,24 @@ type filebeatConfigurer struct { name string base string // Filebeat home path. - filebeatHome string - tmpl *template.Template - closeCh chan bool - watchDuration time.Duration - watchContainer map[string]*logStates - logger log.Logger - lock sync.Mutex + filebeatHome string + tmpl *template.Template + closeCh chan bool + watchDuration time.Duration + watchContainer map[string]*logStates + logger log.Logger + lock sync.Mutex + filebeatNamespace string + filebeatPodRegex *regexp.Regexp + filebeatContainer string } // New creates a new filebeat configurer. -func New(baseDir, configTemplateFile, filebeatHome string) (configurer.Configurer, error) { +func New(baseDir, configTemplateFile, filebeatHome, filebeatNamespace, filebeatPodRegex, filebeatContainer string) (configurer.Configurer, error) { + filebeatPodCompiledRegex, err := regexp.Compile(filebeatPodRegex) + if err != nil { + return nil, fmt.Errorf("parse Pod name regex: %s", err.Error()) + } t, err := template.ParseFiles(configTemplateFile) if err != nil { return nil, fmt.Errorf("error parse log template: %v", err) @@ -55,14 +62,17 @@ func New(baseDir, configTemplateFile, filebeatHome string) (configurer.Configure logger := logp.NewLogger("configurer") c := &filebeatConfigurer{ - logger: logger, - name: "filebeat", - filebeatHome: filebeatHome, - base: baseDir, - tmpl: t, - closeCh: make(chan bool), - watchContainer: make(map[string]*logStates, 0), - watchDuration: 60 * time.Second, + logger: logger, + name: "filebeat", + filebeatHome: filebeatHome, + base: baseDir, + tmpl: t, + closeCh: make(chan bool), + watchContainer: make(map[string]*logStates, 0), + watchDuration: 60 * time.Second, + filebeatNamespace: filebeatNamespace, + filebeatPodRegex: filebeatPodCompiledRegex, + filebeatContainer: filebeatContainer, } if err := os.MkdirAll(c.getInputsDir(), 0644); err != nil { @@ -324,12 +334,24 @@ func (c *filebeatConfigurer) render(ev *configurer.ContainerAddEvent) (string, e "containerId": ev.Container.ID, "configList": ev.LogConfigs, } + // If an ES receiver failed to index a message, Filebeat will output an error message matching the format below; + // this error message could be gathered again and thus form a loop, and likely consume a lot of resources; + // therefore, we exclude this kind of error message when gathering logs from a Filebeat container + if c.isFilebeatContainer(ev.Container) { + context["isFilebeat"] = true + } if err := c.tmpl.Execute(&buf, context); err != nil { return "", err } return buf.String(), nil } +func (c *filebeatConfigurer) isFilebeatContainer(cInfo container.Container) bool { + return cInfo.Namespace == c.filebeatNamespace && + c.filebeatPodRegex.MatchString(cInfo.Pod) && + c.filebeatContainer == cInfo.Name +} + func (c *filebeatConfigurer) getRegsitryState() (map[string]RegistryState, error) { f, err := os.Open(c.getRegistryFile()) if err != nil {