-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathpinoSeqStream.js
117 lines (103 loc) · 3.06 KB
/
pinoSeqStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"use strict";
let stream = require('stream');
let seq = require('seq-logging');
let LEVEL_NAMES = {
10: 'Verbose',
20: 'Debug',
30: 'Information',
40: 'Warning',
50: 'Error',
60: 'Fatal'
};
class PinoSeqStream extends stream.Writable {
constructor(config) {
super();
let { additionalProperties, logOtherAs, ...loggerConfig } = config == null ? {} : { ...config };
loggerConfig.onError = loggerConfig.onError || function (e) { console.error('[PinoSeqStream] Log batch failed\n', e) };
this._additionalProperties = additionalProperties;
this._logOtherAs = logOtherAs;
this._bufferTime = false;
this._buffer = [];
this._logger = new seq.Logger(loggerConfig);
}
_write(message, enc, cb) {
if (message) {
try {
let eventCopy = JSON.parse(message);
let { time, level, msg, err, error, stack, ...props } = eventCopy;
// Get the properties from the error
let { message: errMessage, stack: errStack, ...errorProps } = err || error || {};
let forSeq = {
timestamp: new Date(time),
level: LEVEL_NAMES[level],
traceId: props.trace_id,
spanId: props.span_id,
messageTemplate: msg ? msg : errMessage,
properties: { ...this._additionalProperties, ...errorProps, ...props },
exception: stack ? stack : errStack
};
// Handle sending to sql separatly
try {
// If we get a new correctly formatted message, flush the buffer
if (this._logOtherAs) {
this.flushBuffer();
}
this._logger.emit(forSeq);
} catch (err) {
console.error(err);
}
} catch (err) {
const msg = String(message);
console.error(msg);
if (this._logOtherAs) {
this.handleUnstructuredMessage(msg);
}
}
}
cb();
}
handleUnstructuredMessage(message) {
this._bufferTime = this._bufferTime ? this._bufferTime : new Date();
this._buffer.push(message);
// Flush the message buffer after 1 sec of inacticity
if (!this._flushTimer) {
this._flushTimer = setTimeout(() => {
this.flushBuffer();
}, 1000);
}
}
flushBuffer() {
if (this._buffer.length) {
try {
// No need to flush again
if (this._flushTimer) {
clearTimeout(this._flushTimer);
}
this._logger.emit({
timestamp: this._bufferTime,
level: this._logOtherAs,
messageTemplate: this._buffer.join('\n'),
properties: { ...this._additionalProperties },
});
this._bufferTime = false;
this._buffer = [];
} catch (err) {
console.error(err);
}
}
}
flush() {
this.flushBuffer();
return this._logger.flush();
}
// Force the underlying logger to flush at the time of the call
// and wait for pending writes to complete
_final(callback) {
this.flushBuffer();
this._logger
.close()
.then(() => callback())
.catch((err) => callback(err));
}
}
module.exports = PinoSeqStream;