forked from elastic/logstash-forwarder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
logstash-forwarder.go
73 lines (60 loc) · 2.25 KB
/
logstash-forwarder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package main
import (
"flag"
"log"
"os"
"runtime/pprof"
"time"
)
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var spool_size = flag.Uint64("spool-size", 1024, "Maximum number of events to spool before a flush is forced.")
var idle_timeout = flag.Duration("idle-flush-time", 5*time.Second, "Maximum time to wait for a full spool before flushing anyway")
var config_file = flag.String("config", "", "The config file to load")
var use_syslog = flag.Bool("log-to-syslog", false, "Log to syslog instead of stdout")
var from_beginning = flag.Bool("from-beginning", false, "Read new files from the beginning, instead of the end")
func main() {
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
go func() {
time.Sleep(60 * time.Second)
pprof.StopCPUProfile()
panic("done")
}()
}
config, err := LoadConfig(*config_file)
if err != nil {
return
}
event_chan := make(chan *FileEvent, 16)
publisher_chan := make(chan []*FileEvent, 1)
registrar_chan := make(chan []*FileEvent, 1)
if len(config.Files) == 0 {
log.Fatalf("No paths given. What files do you want me to watch?\n")
}
// The basic model of execution:
// - prospector: finds files in paths/globs to harvest, starts harvesters
// - harvester: reads a file, sends events to the spooler
// - spooler: buffers events until ready to flush to the publisher
// - publisher: writes to the network, notifies registrar
// - registrar: records positions of files read
// Finally, prospector uses the registrar information, on restart, to
// determine where in each file to resume a harvester.
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
if *use_syslog {
configureSyslog()
}
// Prospect the globs/paths given on the command line and launch harvesters
for _, fileconfig := range config.Files {
go Prospect(fileconfig, event_chan)
}
// Harvesters dump events into the spooler.
go Spool(event_chan, publisher_chan, *spool_size, *idle_timeout)
go Publishv1(publisher_chan, registrar_chan, &config.Network)
// registrar records last acknowledged positions in all files.
Registrar(registrar_chan)
} /* main */