Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove default labels #103

Open
wants to merge 3 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ Several usage examples with a test configuration for Grafana+Loki+Promtail resid
### Options
LokiTransport() takes a Javascript object as an input. These are the options that are available, __required in bold__:

| **Parameter** | **Description** | **Example** | **Default** |
| ------------------ | --------------------------------------------------------- | -----------------------| ------------- |
| __`host`__ | URL for Grafana Loki | http://127.0.0.1:3100 | null |
| `interval` | The interval at which batched logs are sent in seconds | 30 | 5 |
| `json` | Use JSON instead of Protobuf for transport | true | false |
| `batching` | If batching is not used, the logs are sent as they come | true | true |
| `clearOnError` | Discard any logs that result in an error during transport | true | false |
| `replaceTimestamp` | Replace any log timestamps with Date.now() | true | false |
| `labels` | custom labels, key-value pairs | { module: 'http' } | undefined |
| `format` | winston format (https://github.com/winstonjs/winston#formats) | simple() | undefined |
| `gracefulShutdown` | Enable/disable graceful shutdown (wait for any unsent batches) | false | true |
| `timeout` | timeout for requests to grafana loki in ms | 30000 | undefined |
| `basicAuth` | basic authentication credentials to access Loki over HTTP | username:password | undefined |
| `onConnectionError`| Loki error connection handler | (err) => console.error(err) | undefined |
| **Parameter** | **Description** | **Example** | **Default** |
| ------------------ | --------------------------------------------------------- | -----------------------|----------------------|
| __`host`__ | URL for Grafana Loki | http://127.0.0.1:3100 | null |
| `interval` | The interval at which batched logs are sent in seconds | 30 | 5 |
| `json` | Use JSON instead of Protobuf for transport | true | false |
| `batching` | If batching is not used, the logs are sent as they come | true | true |
| `clearOnError` | Discard any logs that result in an error during transport | true | false |
| `replaceTimestamp` | Replace any log timestamps with Date.now() | true | false |
| `labels` | custom labels, key-value pairs | { module: 'http' } | {job:'winston-loki'} |
| `format` | winston format (https://github.com/winstonjs/winston#formats) | simple() | undefined |
| `gracefulShutdown` | Enable/disable graceful shutdown (wait for any unsent batches) | false | true |
| `timeout` | timeout for requests to grafana loki in ms | 30000 | undefined |
| `basicAuth` | basic authentication credentials to access Loki over HTTP | username:password | undefined |
| `onConnectionError`| Loki error connection handler | (err) => console.error(err) | undefined |

### Example
With default formatting:
Expand Down
25 changes: 5 additions & 20 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LokiTransport extends Transport {
})

this.useCustomFormat = options.format !== undefined
this.labels = options.labels
this.labels = options.labels || { job: 'winston-loki' }
IppX marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -52,32 +52,17 @@ class LokiTransport extends Transport {
})

// Deconstruct the log
const { label, labels, timestamp, level, message, ...rest } = info

// build custom labels if provided
let lokiLabels = { level: level }

if (this.labels) {
lokiLabels = Object.assign(lokiLabels, this.labels)
} else {
lokiLabels.job = label
}

lokiLabels = Object.assign(lokiLabels, labels)
const { timestamp } = info

// follow the format provided
const line = this.useCustomFormat
? info[MESSAGE]
: `${message} ${
rest && Object.keys(rest).length > 0 ? JSON.stringify(rest) : ''
}`
const line = this.useCustomFormat ? info[MESSAGE] : JSON.stringify(info)
IppX marked this conversation as resolved.
Show resolved Hide resolved

// Make sure all label values are strings
lokiLabels = Object.fromEntries(Object.entries(lokiLabels).map(([key, value]) => [key, value ? value.toString() : value]))
this.labels = Object.fromEntries(Object.entries(this.labels).map(([key, value]) => [key, value ? value.toString() : value]))

// Construct the log to fit Grafana Loki's accepted format
const logEntry = {
labels: lokiLabels,
labels: this.labels,
entries: [
{
ts: timestamp || Date.now().valueOf(),
Expand Down
3 changes: 2 additions & 1 deletion src/batcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const exitHook = require('async-exit-hook')
const { logproto } = require('./proto')
const protoHelpers = require('./proto/helpers')
const req = require('./requests')
const {deepEqual} = require("./helpers");
let snappy = false

/**
Expand Down Expand Up @@ -114,7 +115,7 @@ class Batcher {

// Find if there's already a log with identical labels in the batch
const match = streams.findIndex(
stream => JSON.stringify(stream.labels) === JSON.stringify(logEntry.labels)
stream => deepEqual(stream.labels, logEntry.labels)
)

if (match > -1) {
Expand Down
27 changes: 27 additions & 0 deletions src/helpers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
function isObject(object) {
return object != null && typeof object === 'object';
}

function deepEqual(object1, object2) {
const keys1 = Object.keys(object1);
const keys2 = Object.keys(object2);

if (keys1.length !== keys2.length) {
return false;
}

for (const key of keys1) {
const value1 = object1[key];
const value2 = object2[key];

if (isObject(value1) && isObject(value2)) {
if (!deepEqual(value1, value2)) return false;
} else {
if (value1 !== value2) return false;
}
}

return true;
}

module.exports = {isObject, deepEqual}
6 changes: 2 additions & 4 deletions src/proto/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ module.exports = {
if (typeof logEntry.labels === 'string') {
return logEntry
}
let protoLabels = `{level="${logEntry.labels.level}"`
delete logEntry.labels.level
let protoLabels = ''
for (const key in logEntry.labels) {
protoLabels += `,${key}="${logEntry.labels[key]}"`
}
protoLabels += '}'
logEntry.labels = protoLabels
logEntry.labels = `{${protoLabels.substring(1)}}`
return logEntry
})
return batch
Expand Down
19 changes: 10 additions & 9 deletions test/batcher.json.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ describe('Batcher tests with JSON transport', function () {
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[2]))
expect(batcher.batch.streams.length).toBe(1)
})
it('Should add items with different labels in separate streams', function () {
// Skip as there are currently no way to add dynamic labels
it.skip('Should add items with different labels in separate streams', function () {
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[0]))
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[2]))
expect(batcher.batch.streams.length).toBe(2)
Expand All @@ -93,7 +94,7 @@ describe('Batcher tests with JSON transport', function () {
it('Should be able to clear the batch of streams', function () {
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[0]))
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[2]))
expect(batcher.batch.streams.length).toBe(2)
expect(batcher.batch.streams.length).toBe(1)
batcher.clearBatch()
expect(batcher.batch.streams.length).toBe(0)
})
Expand All @@ -108,10 +109,10 @@ describe('Batcher tests with JSON transport', function () {
}
}
req.post.mockResolvedValue(responseObject)
batcher.pushLogEntry(fixtures.logs[1])
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[1]))

expect(req.post.mock.calls[0][req.post.mock.calls[0].length - 1]).toBe(
JSON.stringify({ streams: [JSON.parse(fixtures.logs_mapped_after[1])] })
expect(req.post.mock.calls[0][req.post.mock.calls[0].length - 2]).toBe(
JSON.stringify({ streams: [JSON.parse(fixtures.logs_mapped_after_json[1])] })
)
})
it('Should clear batch and resolve on successful send', async function () {
Expand All @@ -122,14 +123,14 @@ describe('Batcher tests with JSON transport', function () {
}
}
req.post.mockResolvedValue(responseObject)
batcher.pushLogEntry(fixtures.logs[0])
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[0]))

expect(batcher.batch.streams.length).toBe(1)

await batcher.sendBatchToLoki()

expect(req.post.mock.calls[0][req.post.mock.calls[0].length - 1]).toBe(
JSON.stringify({ streams: [JSON.parse(fixtures.logs_mapped_after[0])] })
expect(req.post.mock.calls[0][req.post.mock.calls[0].length - 2]).toBe(
JSON.stringify({ streams: [JSON.parse(fixtures.logs_mapped_after_json[0])] })
)
expect(batcher.batch.streams.length).toBe(0)
})
Expand Down Expand Up @@ -193,7 +194,7 @@ describe('Batcher tests with JSON transport', function () {

batcher.circuitBreakerInterval = circuitBreakerInterval
batcher.run()
batcher.pushLogEntry(fixtures.logs[0])
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[0]))

expect(batcher.batch.streams.length).toBe(1)
expect(batcher.interval).toBe(fixtures.options_json.interval * 1000)
Expand Down
28 changes: 15 additions & 13 deletions test/batcher.protobuf.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,20 @@ describe('Batcher tests with Protobuf + gRPC transport', function () {
batcher = new Batcher(options)

const logEntryConverted = createProtoTimestamps(
fixtures.logs[1]
JSON.parse(fixtures.logs_mapped_before[1])
)
const preparedLogEntry = prepareProtoBatch({ streams: [logEntryConverted] })
const stub = await jest.spyOn(batcher, 'sendBatchToLoki')
const stub = await jest.spyOn(batcher, 'sendBatchToLoki').mockImplementation((logEntry) => {
expect(logEntry).toEqual(logEntryConverted)
})

batcher.pushLogEntry(fixtures.logs[1])
expect(stub).toHaveBeenCalledWith(preparedLogEntry)
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[1]))
expect(stub).toHaveBeenCalledTimes(1)
stub.mockRestore()
})
it('Should be able to clear the batch of streams', function () {
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[0]))
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[2]))
expect(batcher.batch.streams.length).toBe(2)
expect(batcher.batch.streams.length).toBe(1)
batcher.clearBatch()
expect(batcher.batch.streams.length).toBe(0)
})
Expand All @@ -61,16 +62,17 @@ describe('Batcher tests with Protobuf + gRPC transport', function () {
})
it("Should fail if snappy can't compress the buffer", async function () {
batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[2]))
this.finish = await jest.spyOn(
logproto.PushRequest.encode(batcher.batch),
'finish'
this.encode = await jest.spyOn(
logproto.PushRequest,
'encode'
)
this.finish.mockReturnValue(null)
this.encode.mockReturnValue(null)
try {
await batcher.sendBatchToLoki()
} catch (error) {
expect(error).toBeTruthy()
}
this.encode.mockRestore()
})
it('Should wrap single logEntry in {streams: []} if batching is disabled', async function () {
const options = JSON.parse(JSON.stringify(fixtures.options_protobuf))
Expand All @@ -83,18 +85,18 @@ describe('Batcher tests with Protobuf + gRPC transport', function () {
}
}
req.post.mockResolvedValue(responseObject)
await batcher.pushLogEntry(fixtures.logs[1])
await batcher.pushLogEntry(JSON.parse(fixtures.logs_mapped_before[1]))

const logEntryConverted = createProtoTimestamps(
fixtures.logs[1]
JSON.parse(fixtures.logs_mapped_before[1])
)
const preparedLogEntry = prepareProtoBatch({ streams: [logEntryConverted] })
const buffer = logproto.PushRequest.encode(preparedLogEntry).finish()

const snappy = require('snappy')
const data = snappy.compressSync(buffer)
expect(
req.post.mock.calls[0][req.post.mock.calls[0].length - 1]
req.post.mock.calls[0][req.post.mock.calls[0].length - 2]
).toEqual(data)
})
it('Should construct without snappy binaries to a JSON transport', function () {
Expand Down
3 changes: 2 additions & 1 deletion test/custom-labels-lines.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ describe('Integration tests', function () {
host: 'http://localhost',
level: 'debug',
interval: 10,
json: true,
labels: {
module: 'name',
app: 'appname'
Expand All @@ -52,7 +53,7 @@ describe('Integration tests', function () {
expect(
lokiTransport.batcher.batch.streams[0]
).toEqual({
labels: { level: 'debug', module: 'name', app: 'appname', customLabel: testLabel },
labels: { module: 'name', app: 'appname'},
entries: [{
line: `[name] ${testMessage}`,
ts: expect.toBeWithinRange(now - 5, now + 5)
Expand Down
20 changes: 10 additions & 10 deletions test/fixtures.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"options_json": { "host": "http://localhost", "interval": 0.1, "json": true },
"options_json": { "host": "http://localhost", "labels": {"alabel": "avalue", "job": "winston-loki"},"interval": 0.1, "json": true },
"options_no_batching": { "host": "http://localhost", "batching": false },
"options_protobuf": {
"host": "http://localhost",
Expand Down Expand Up @@ -36,15 +36,15 @@
}
],
"logs_mapped_before": [
"{\"labels\":{\"job\":\"test\", \"level\":\"info\"},\"entries\":[{\"ts\":1546977515828,\"line\":\"testings \"}]}",
"{\"labels\":\"{\"job\":\"test\", \"level\":\"error\"}\",\"entries\":[{\"ts\":1546977615848,\"line\":\"you broke everything \"} ]}",
"{\"labels\":{\"level\":\"error\",\"job\":\"test\"}, \"entries\":[{\"ts\":1546977615848,\"line\":\"you broke everything but not quite \"}]}",
"{\"labels\":{\"level\":\"error\", \"jeejee\":\"ebon\", \"job\":\"test\"},\"entries\":[{\"ts\":1546977515858,\"line\":\"you broke everything but not quite \"}]}"],
"logs_mapped_after": [
"{\"stream\":\"{job=\\\"test\\\", level=\\\"info\\\"}\",\"values\":[1546977515828,\"testings \"]}",
"{\"stream\":\"{job=\\\"test\\\", level=\\\"error\\\"}\",\"values\":[1546977615848,\"you broke everything \"]}",
"{\"stream\":\"{job=\\\"test\\\", level=\\\"error\\\"}\",\"values\":[1546977615848,\"you broke everything but not quite \"]}",
"{\"stream\":\"{job=\\\"test\\\", level=\\\"error\\\", jeejee=\\\"ebon\\\"}\",\"values\":[1546977515858,\"you broke everything but not quite \"]}"
"{\"labels\":{\"job\":\"winston-loki\", \"alabel\":\"avalue\"},\"entries\":[{\"ts\":1546977515828,\"line\":\"{\\\"label\\\":\\\"test\\\",\\\"timestamp\\\":1546977515828,\\\"message\\\":\\\"testings\\\",\\\"level\\\":\\\"info\\\"}\"}]}",
"{\"labels\":{\"job\":\"winston-loki\", \"alabel\":\"avalue\"},\"entries\":[{\"ts\":1546977615848,\"line\":\"{\\\"label\\\":\\\"test\\\",\\\"timestamp\\\":1546977615848,\\\"message\\\":\\\"you broke everything\\\",\\\"level\\\":\\\"error\\\"}\"} ]}",
"{\"labels\":{\"alabel\":\"avalue\",\"job\":\"winston-loki\"}, \"entries\":[{\"ts\":1546977615848,\"line\":\"{\\\"label\\\":\\\"test\\\",\\\"timestamp\\\":1546977615848,\\\"message\\\":\\\"you broke everything but not quite\\\",\\\"level\\\":\\\"error\\\"}\"}]}",
"{\"labels\":{\"alabel\":\"avalue\",\"job\":\"winston-loki\"},\"entries\":[{\"ts\":1546977515858,\"line\":\"{\\\"label\\\":\\\"test\\\",\\\"timestamp\\\":1546977515858,\\\"message\\\":\\\"you broke everything but not quite\\\",\\\"level\\\":\\\"error\\\",\\\"labels\\\":{\\\"jeejee\\\":\\\"ebon\\\"}}\"}]}"],
"logs_mapped_after_json": [
"{\"stream\":{\"job\":\"winston-loki\", \"alabel\":\"avalue\"},\"values\":[[\"1546977515828000000\",\"{\\\"label\\\":\\\"test\\\",\\\"timestamp\\\":1546977515828,\\\"message\\\":\\\"testings\\\",\\\"level\\\":\\\"info\\\"}\"]]}",
"{\"stream\":{\"job\":\"winston-loki\", \"alabel\":\"avalue\"},\"values\":[[\"1546977615848000000\",\"{\\\"label\\\":\\\"test\\\",\\\"timestamp\\\":1546977615848,\\\"message\\\":\\\"you broke everything\\\",\\\"level\\\":\\\"error\\\"}\"]]}",
"{\"stream\":{\"alabel\":\"avalue\",\"job\":\"winston-loki\"},\"values\":[[\"1546977615848000000\",\"{\\\"label\\\":\\\"test\\\",\\\"timestamp\\\":1546977615848,\\\"message\\\":\\\"you broke everything but not quite\\\",\\\"level\\\":\\\"error\\\"}\"]]}",
"{\"stream\":{\"alabel\":\"avalue\",\"job\":\"winston-loki\"},\"values\":[[\"1546977515858000000\",\"{\\\"label\\\":\\\"test\\\",\\\"timestamp\\\":1546977515858,\\\"message\\\":\\\"you broke everything but not quite\\\",\\\"level\\\":\\\"error\\\",\\\"labels\\\":{\\\"jeejee\\\":\\\"ebon\\\"}}\"]]}"
],
"incorrect_mapping": "{ \"labelings\": \"{jobbings=\\\"test\\\", levelings=\\\"info\\\"}\", \"entries\": [ { \"tisisnotit\": 1546977515828, \"dontdodisline\": \"testings {}\" } ] }"
}
13 changes: 1 addition & 12 deletions test/transport.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,7 @@ describe('Integration tests', function () {
const lokiTransport = new LokiTransport(fixtures.options_json)
lokiTransport.log(fixtures.logs[0], () => {})
expect(lokiTransport.batcher.batch.streams.length).toBe(1)
expect(lokiTransport.batcher.batch.streams[0]).toEqual(
{
entries: [{
line: 'testings ',
ts: 1546977515828
}],
labels: {
job: 'test',
level: 'info'
}
}
)
expect(lokiTransport.batcher.batch.streams[0]).toEqual(JSON.parse(fixtures.logs_mapped_before[0]))
})
it("LokiTransport should append anything else than the message after it in the log's entry", function () {
const lokiTransport = new LokiTransport(fixtures.options_json)
Expand Down