forked from LuBashQ/natzap
-
Notifications
You must be signed in to change notification settings - Fork 0
/
natzap.go
89 lines (79 loc) · 1.83 KB
/
natzap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package natzap
import (
"errors"
"fmt"
"github.com/nats-io/nats.go"
"go.uber.org/zap/zapcore"
)
type Core struct {
zapcore.LevelEnabler
encoder zapcore.Encoder
con *nats.Conn
js nats.JetStreamContext
subject string
}
func NewCore(enabler zapcore.LevelEnabler, encoder zapcore.Encoder, con *nats.Conn) *Core {
return &Core{
LevelEnabler: enabler,
encoder: encoder,
con: con,
subject: "",
js: nil,
}
}
func (core *Core) WithSubject(subject string) *Core {
core.subject = subject
return core
}
func (core *Core) WithJetStream(stream string) (c *Core, err error) {
core.js, err = core.con.JetStream()
if err != nil {
return nil, err
}
_, err = core.js.StreamInfo(stream)
if errors.Is(err, nats.ErrStreamNotFound) {
return core, err
} else if err != nil {
return nil, err
}
return core, nil
}
func (core *Core) With(fields []zapcore.Field) zapcore.Core {
clone := core.clone()
for _, field := range fields {
field.AddTo(clone.encoder)
}
return clone
}
func (core *Core) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if core.Enabled(entry.Level) {
return checked.AddCore(entry, core)
}
return checked
}
func (core *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error {
buffer, err := core.encoder.EncodeEntry(entry, fields)
if err != nil {
return fmt.Errorf("%v: failed to encode log entry", err)
}
defer buffer.Free()
if core.js != nil {
ack, err := core.js.Publish(core.subject, buffer.Bytes())
if err != nil || ack == nil {
return err
}
return nil
} else {
return core.con.Publish(core.subject, buffer.Bytes())
}
}
func (core *Core) Sync() error {
return nil
}
func (core *Core) clone() *Core {
return &Core{
LevelEnabler: core.LevelEnabler,
encoder: core.encoder.Clone(),
con: core.con,
}
}