Skip to content

Commit

Permalink
fix looped "Cannot index event" error message (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichuan0620 authored and caicloud-bot committed Nov 27, 2019
1 parent f29f527 commit e2282fa
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 37 deletions.
16 changes: 11 additions & 5 deletions assets/filebeat/filebeat.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
- type: log
enabled: true
paths:
- {{ .LogFile }}
- {{ .LogFile }}
scan_frequency: 10s
fields_under_root: true
{{if .Stdout}}
Expand All @@ -12,10 +12,10 @@
cri_flags: true
{{end}}
fields:
cluster: ${CLUSTER_ID}
{{- range $key, $value := .Tags }}
{{ $key }}: "{{ $value }}"
{{- end }}
cluster: ${CLUSTER_ID}
{{- range $key, $value := .Tags }}
{{ $key }}: "{{ $value }}"
{{- end }}
tail_files: false
# Harvester closing options
close_eof: false
Expand All @@ -26,5 +26,11 @@
# State options
clean_removed: true
clean_inactive: 72h
# If an ES receiver failed to index a message, Filebeat will output an error message matching the format below;
# this error message could be gathered again and thus form a loop, and likely consume a lot of resources;
# therefore, we exclude this kind of error message when gathering logs from a Filebeat container
{{if $.isFilebeat}}
exclude_lines: ['^[0-9]{4}-[0-9]{1,2}-[0-9]{1,2}T.+WARN.+elasticsearch/client.+Cannot index event publisher.+']
{{end}}
{{- end}}

27 changes: 15 additions & 12 deletions cmd/log-pilot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@ import (
)

var (
template = flag.String("path.template", "", "Template file path for filebeat")
filebeatHome = flag.String("path.filebeat-home", "", "Filebeat home path")
base = flag.String("path.base", "/", "Directory which mount host path")
logPath = flag.String("path.logs", "", "Logs path")
logPrefix = flag.String("logPrefix", "caicloud", "Log prefix of the env parameters. Multiple prefixes should be separated by \",\"")
logLevel = flag.String("logLevel", "info", "Log level: debug, info, warning, error, critical")
wListNS = flag.String("namespace.whitelist", "", "whitelist of namespaces to watch")
bListNS = flag.String("namespace.blacklist", "", "blacklist of namespaces to ignore")
logMaxBytes = flag.Uint("log.maxSize", 10*1024*1024, "Max size of log file in bytes")
logMaxBackups = flag.Uint("log.maxBackups", 7, "Max backups of log files")
logToStderr = flag.Bool("e", false, "Log to stderr")
template = flag.String("path.template", "", "Template file path for filebeat")
filebeatHome = flag.String("path.filebeat-home", "", "Filebeat home path")
base = flag.String("path.base", "/", "Directory which mount host path")
logPath = flag.String("path.logs", "", "Logs path")
logPrefix = flag.String("logPrefix", "caicloud", "Log prefix of the env parameters. Multiple prefixes should be separated by \",\"")
logLevel = flag.String("logLevel", "info", "Log level: debug, info, warning, error, critical")
wListNS = flag.String("namespace.whitelist", "", "whitelist of namespaces to watch")
bListNS = flag.String("namespace.blacklist", "", "blacklist of namespaces to ignore")
logMaxBytes = flag.Uint("log.maxSize", 10*1024*1024, "Max size of log file in bytes")
logMaxBackups = flag.Uint("log.maxBackups", 7, "Max backups of log files")
logToStderr = flag.Bool("e", false, "Log to stderr")
filebeatNamespace = flag.String("filebeat.namespace", "kube-system", "namespace where Filebeat is deployed")
filebeatPodRegex = flag.String("filebeat.podRegex", "^logging-filebeat.+", "regex with what to match the name of the Filebeat Pod")
filebeatContainer = flag.String("filebeat.container", "c0", "name of the Filebeat container")
)

func main() {
Expand All @@ -38,7 +41,7 @@ func main() {
log.Fatal("Invalid path.base:", err)
}
var cfgr configurer.Configurer
cfgr, err = filebeat.New(baseDir, *template, *filebeatHome)
cfgr, err = filebeat.New(baseDir, *template, *filebeatHome, *filebeatNamespace, *filebeatPodRegex, *filebeatContainer)
if err != nil {
log.Fatalf("Error create configurer: %v", err)
}
Expand Down
62 changes: 42 additions & 20 deletions pilot/configurer/filebeat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (
"sync"
"text/template"
"time"
"regexp"

"github.com/caicloud/log-pilot/pilot/container"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/go-ucfg"

"github.com/caicloud/log-pilot/pilot/container"
"github.com/caicloud/log-pilot/pilot/configurer"
"github.com/caicloud/log-pilot/pilot/log"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/go-ucfg"
)

// logStates contains states in filebeat registry and related to the container
Expand All @@ -33,17 +33,24 @@ type filebeatConfigurer struct {
name string
base string
// Filebeat home path.
filebeatHome string
tmpl *template.Template
closeCh chan bool
watchDuration time.Duration
watchContainer map[string]*logStates
logger log.Logger
lock sync.Mutex
filebeatHome string
tmpl *template.Template
closeCh chan bool
watchDuration time.Duration
watchContainer map[string]*logStates
logger log.Logger
lock sync.Mutex
filebeatNamespace string
filebeatPodRegex *regexp.Regexp
filebeatContainer string
}

// New creates a new filebeat configurer.
func New(baseDir, configTemplateFile, filebeatHome string) (configurer.Configurer, error) {
func New(baseDir, configTemplateFile, filebeatHome, filebeatNamespace, filebeatPodRegex, filebeatContainer string) (configurer.Configurer, error) {
filebeatPodCompiledRegex, err := regexp.Compile(filebeatPodRegex)
if err != nil {
return nil, fmt.Errorf("parse Pod name regex: %s", err.Error())
}
t, err := template.ParseFiles(configTemplateFile)
if err != nil {
return nil, fmt.Errorf("error parse log template: %v", err)
Expand All @@ -55,14 +62,17 @@ func New(baseDir, configTemplateFile, filebeatHome string) (configurer.Configure

logger := logp.NewLogger("configurer")
c := &filebeatConfigurer{
logger: logger,
name: "filebeat",
filebeatHome: filebeatHome,
base: baseDir,
tmpl: t,
closeCh: make(chan bool),
watchContainer: make(map[string]*logStates, 0),
watchDuration: 60 * time.Second,
logger: logger,
name: "filebeat",
filebeatHome: filebeatHome,
base: baseDir,
tmpl: t,
closeCh: make(chan bool),
watchContainer: make(map[string]*logStates, 0),
watchDuration: 60 * time.Second,
filebeatNamespace: filebeatNamespace,
filebeatPodRegex: filebeatPodCompiledRegex,
filebeatContainer: filebeatContainer,
}

if err := os.MkdirAll(c.getInputsDir(), 0644); err != nil {
Expand Down Expand Up @@ -324,12 +334,24 @@ func (c *filebeatConfigurer) render(ev *configurer.ContainerAddEvent) (string, e
"containerId": ev.Container.ID,
"configList": ev.LogConfigs,
}
// If an ES receiver failed to index a message, Filebeat will output an error message matching the format below;
// this error message could be gathered again and thus form a loop, and likely consume a lot of resources;
// therefore, we exclude this kind of error message when gathering logs from a Filebeat container
if c.isFilebeatContainer(ev.Container) {
context["isFilebeat"] = true
}
if err := c.tmpl.Execute(&buf, context); err != nil {
return "", err
}
return buf.String(), nil
}

func (c *filebeatConfigurer) isFilebeatContainer(cInfo container.Container) bool {
return cInfo.Namespace == c.filebeatNamespace &&
c.filebeatPodRegex.MatchString(cInfo.Pod) &&
c.filebeatContainer == cInfo.Name
}

func (c *filebeatConfigurer) getRegsitryState() (map[string]RegistryState, error) {
f, err := os.Open(c.getRegistryFile())
if err != nil {
Expand Down

0 comments on commit e2282fa

Please sign in to comment.