Skip to content

Commit

Permalink
Splits the log source handling with a pluggable interface.
Browse files Browse the repository at this point in the history
Provides a cleaner split between log sources, specifically for not
compiling with systemd libraries.

This is in preparation for a new log source to read from Docker.
  • Loading branch information
tommie authored and BartVerc committed Mar 16, 2021
1 parent efcf247 commit 82fa993
Show file tree
Hide file tree
Showing 10 changed files with 568 additions and 246 deletions.
63 changes: 63 additions & 0 deletions logsource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"fmt"
"io"

"github.com/alecthomas/kingpin"
)

// A LogSourceFactory provides a repository of log sources that can be
// instantiated from command line flags.
type LogSourceFactory interface {
// Init adds the factory's struct fields as flags in the
// application.
Init(*kingpin.Application)

// New attempts to create a new log source. This is called after
// flags have been parsed. Returning `nil, nil`, means the user
// didn't want this log source.
New() (LogSourceCloser, error)
}

type LogSourceCloser interface {
io.Closer
LogSource
}

var logSourceFactories []LogSourceFactory

// RegisterLogSourceFactory can be called from module `init` functions
// to register factories.
func RegisterLogSourceFactory(lsf LogSourceFactory) {
logSourceFactories = append(logSourceFactories, lsf)
}

// InitLogSourceFactories runs Init on all factories. The
// initialization order is arbitrary, except `fileLogSourceFactory` is
// always last (the fallback). The file log source must be last since
// it's enabled by default.
func InitLogSourceFactories(app *kingpin.Application) {
RegisterLogSourceFactory(&fileLogSourceFactory{})

for _, f := range logSourceFactories {
f.Init(app)
}
}

// NewLogSourceFromFactories iterates through the factories and
// attempts to instantiate a log source. The first factory to return
// success wins.
func NewLogSourceFromFactories() (LogSourceCloser, error) {
for _, f := range logSourceFactories {
src, err := f.New()
if err != nil {
return nil, err
}
if src != nil {
return src, nil
}
}

return nil, fmt.Errorf("no log source configured")
}
78 changes: 78 additions & 0 deletions logsource_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"context"
"io"
"log"

"github.com/alecthomas/kingpin"
"github.com/hpcloud/tail"
)

// A FileLogSource can read lines from a file.
type FileLogSource struct {
tailer *tail.Tail
}

// NewFileLogSource creates a new log source, tailing the given file.
func NewFileLogSource(path string) (*FileLogSource, error) {
tailer, err := tail.TailFile(path, tail.Config{
ReOpen: true, // reopen the file if it's rotated
MustExist: true, // fail immediately if the file is missing or has incorrect permissions
Follow: true, // run in follow mode
Location: &tail.SeekInfo{Whence: io.SeekEnd}, // seek to end of file
Logger: tail.DiscardingLogger,
})
if err != nil {
return nil, err
}
return &FileLogSource{tailer}, nil
}

func (s *FileLogSource) Close() error {
defer s.tailer.Cleanup()
go func() {
// Stop() waits for the tailer goroutine to shut down, but it
// can be blocking on sending on the Lines channel...
for range s.tailer.Lines {
}
}()
return s.tailer.Stop()
}

func (s *FileLogSource) Path() string {
return s.tailer.Filename
}

func (s *FileLogSource) Read(ctx context.Context) (string, error) {
select {
case line, ok := <-s.tailer.Lines:
if !ok {
return "", io.EOF
}
return line.Text, nil
case <-ctx.Done():
return "", ctx.Err()
}
}

// A fileLogSourceFactory is a factory than can create log sources
// from command line flags.
//
// Because this factory is enabled by default, it must always be
// registered last.
type fileLogSourceFactory struct {
path string
}

func (f *fileLogSourceFactory) Init(app *kingpin.Application) {
app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").StringVar(&f.path)
}

func (f *fileLogSourceFactory) New() (LogSourceCloser, error) {
if f.path == "" {
return nil, nil
}
log.Printf("Reading log events from %s", f.path)
return NewFileLogSource(f.path)
}
87 changes: 87 additions & 0 deletions logsource_file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"context"
"fmt"
"io/ioutil"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestFileLogSource_Path(t *testing.T) {
path, close, err := setupFakeLogFile()
if err != nil {
t.Fatalf("setupFakeTailer failed: %v", err)
}
defer close()

src, err := NewFileLogSource(path)
if err != nil {
t.Fatalf("NewFileLogSource failed: %v", err)
}
defer src.Close()

assert.Equal(t, path, src.Path(), "Path should be set by New.")
}

func TestFileLogSource_Read(t *testing.T) {
ctx := context.Background()

path, close, err := setupFakeLogFile()
if err != nil {
t.Fatalf("setupFakeTailer failed: %v", err)
}
defer close()

src, err := NewFileLogSource(path)
if err != nil {
t.Fatalf("NewFileLogSource failed: %v", err)
}
defer src.Close()

s, err := src.Read(ctx)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.")
}

func setupFakeLogFile() (string, func(), error) {
f, err := ioutil.TempFile("", "filelogsource")
if err != nil {
return "", nil, err
}

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
defer os.Remove(f.Name())
defer f.Close()

for {
// The tailer seeks to the end and then does a
// follow. Keep writing lines so we know it wakes up and
// returns lines.
fmt.Fprintln(f, "Feb 13 23:31:30 ahost anid[123]: aline")

select {
case <-time.After(10 * time.Millisecond):
// continue
case <-ctx.Done():
return
}
}
}()

return f.Name(), func() {
cancel()
wg.Wait()
}, nil
}
143 changes: 143 additions & 0 deletions logsource_systemd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// +build !nosystemd,linux

package main

import (
"context"
"fmt"
"io"
"log"
"time"

"github.com/alecthomas/kingpin"
"github.com/coreos/go-systemd/v22/sdjournal"
)

// timeNow is a test fake injection point.
var timeNow = time.Now

// A SystemdLogSource reads log records from the given Systemd
// journal.
type SystemdLogSource struct {
journal SystemdJournal
path string
}

// A SystemdJournal is the journal interface that sdjournal.Journal
// provides. See https://pkg.go.dev/github.com/coreos/go-systemd/sdjournal?tab=doc
type SystemdJournal interface {
io.Closer
AddMatch(match string) error
GetEntry() (*sdjournal.JournalEntry, error)
Next() (uint64, error)
SeekRealtimeUsec(usec uint64) error
Wait(timeout time.Duration) int
}

// NewSystemdLogSource returns a log source for reading Systemd
// journal entries. `unit` and `slice` provide filtering if non-empty
// (with `slice` taking precedence).
func NewSystemdLogSource(j SystemdJournal, path, unit, slice string) (*SystemdLogSource, error) {
logSrc := &SystemdLogSource{journal: j, path: path}

var err error
if slice != "" {
err = logSrc.journal.AddMatch("_SYSTEMD_SLICE=" + slice)
} else if unit != "" {
err = logSrc.journal.AddMatch("_SYSTEMD_UNIT=" + unit)
}
if err != nil {
logSrc.journal.Close()
return nil, err
}

// Start at end of journal
if err := logSrc.journal.SeekRealtimeUsec(uint64(timeNow().UnixNano() / 1000)); err != nil {
logSrc.journal.Close()
return nil, err
}

if r := logSrc.journal.Wait(1 * time.Second); r < 0 {
logSrc.journal.Close()
return nil, err
}

return logSrc, nil
}

func (s *SystemdLogSource) Close() error {
return s.journal.Close()
}

func (s *SystemdLogSource) Path() string {
return s.path
}

func (s *SystemdLogSource) Read(ctx context.Context) (string, error) {
c, err := s.journal.Next()
if err != nil {
return "", err
}
if c == 0 {
return "", io.EOF
}

e, err := s.journal.GetEntry()
if err != nil {
return "", err
}
ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond))

return fmt.Sprintf(
"%s %s %s[%s]: %s",
ts.Format(time.Stamp),
e.Fields["_HOSTNAME"],
e.Fields["SYSLOG_IDENTIFIER"],
e.Fields["_PID"],
e.Fields["MESSAGE"],
), nil
}

// A systemdLogSourceFactory is a factory that can create
// SystemdLogSources from command line flags.
type systemdLogSourceFactory struct {
enable bool
unit, slice, path string
}

func (f *systemdLogSourceFactory) Init(app *kingpin.Application) {
app.Flag("systemd.enable", "Read from the systemd journal instead of log").Default("false").BoolVar(&f.enable)
app.Flag("systemd.unit", "Name of the Postfix systemd unit.").Default("postfix.service").StringVar(&f.unit)
app.Flag("systemd.slice", "Name of the Postfix systemd slice. Overrides the systemd unit.").Default("").StringVar(&f.slice)
app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(&f.path)
}

func (f *systemdLogSourceFactory) New() (LogSourceCloser, error) {
if !f.enable {
return nil, nil
}

log.Println("Reading log events from systemd")
j, path, err := newSystemdJournal(f.path)
if err != nil {
return nil, err
}
return NewSystemdLogSource(j, path, f.unit, f.slice)
}

// newSystemdJournal creates a journal handle. It returns the handle
// and a string representation of it. If `path` is empty, it connects
// to the local journald.
func newSystemdJournal(path string) (*sdjournal.Journal, string, error) {
if path != "" {
j, err := sdjournal.NewJournalFromDir(path)
return j, path, err
}

j, err := sdjournal.NewJournal()
return j, "journald", err
}

func init() {
RegisterLogSourceFactory(&systemdLogSourceFactory{})
}
Loading

0 comments on commit 82fa993

Please sign in to comment.