Skip to content

Commit

Permalink
Merge pull request #1475 from cybozu/promtail-force-flush-partial-line
Browse files Browse the repository at this point in the history
promtail: flush partial lines which are unsent for a while
  • Loading branch information
umezawatakeshi authored Nov 22, 2024
2 parents 17246cc + 26af479 commit 0a8a5dd
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 1 deletion.
2 changes: 2 additions & 0 deletions promtail/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ RUN meson setup /work/systemd/build/ /work/systemd/ && \
meson install -C /work/systemd/build/

RUN git clone --depth=1 -b v${LOKI_VERSION} https://github.com/grafana/loki /work/loki
COPY force-flush-partial-line.patch /work/
RUN cd loki && patch -p1 < ../force-flush-partial-line.patch
RUN make -C /work/loki BUILD_IN_CONTAINER=false PROMTAIL_JOURNAL_ENABLED=true promtail

FROM ghcr.io/cybozu/ubuntu:22.04
Expand Down
2 changes: 1 addition & 1 deletion promtail/TAG
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.2.1.1
3.2.1.2
150 changes: 150 additions & 0 deletions promtail/force-flush-partial-line.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
diff --git a/clients/pkg/logentry/stages/extensions.go b/clients/pkg/logentry/stages/extensions.go
index c7ebdd18f..bd7601a1a 100644
--- a/clients/pkg/logentry/stages/extensions.go
+++ b/clients/pkg/logentry/stages/extensions.go
@@ -3,6 +3,7 @@ package stages
import (
"strings"
"sync"
+ "time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
@@ -14,8 +15,9 @@ import (
)

const (
- RFC3339Nano = "RFC3339Nano"
- MaxPartialLinesSize = 100 // Max buffer size to hold partial lines.
+ RFC3339Nano = "RFC3339Nano"
+ MaxPartialLinesSize = 100 // Max buffer size to hold partial lines.
+ DefaultMaxPartialLineAge = time.Minute
)

// NewDocker creates a Docker json log format specific pipeline stage.
@@ -68,7 +70,7 @@ func (*cri) Cleanup() {
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)

- in := RunWithSkipOrSendMany(entry, func(e Entry) ([]Entry, bool) {
+ in := RunWithSkipOrSendManyWithTick(entry, func(e Entry) ([]Entry, bool) {
fingerprint := e.Labels.Fingerprint()

// We received partial-line (tag: "P")
@@ -115,6 +117,23 @@ func (c *cri) Run(entry chan Entry) chan Entry {
delete(c.partialLines, fingerprint)
}
return []Entry{e}, false
+ }, 10*time.Second, func() []Entry {
+ // Send partial lines which are left unsent for a while.
+ threshold := time.Now().Add(-c.cfg.MaxPartialLineAge)
+
+ entries := make([]Entry, 0)
+ fingerprints := make([]model.Fingerprint, 0)
+ for k, v := range c.partialLines {
+ if v.Timestamp.Before(threshold) {
+ level.Warn(c.base.logger).Log("msg", "flushing partial line due to max age", "labels", v.Labels)
+ entries = append(entries, v)
+ fingerprints = append(fingerprints, k)
+ }
+ }
+ for _, fp := range fingerprints {
+ delete(c.partialLines, fp)
+ }
+ return entries
})

return in
@@ -131,6 +150,7 @@ type CriConfig struct {
MaxPartialLines int `mapstructure:"max_partial_lines"`
MaxPartialLineSize flagext.ByteSize `mapstructure:"max_partial_line_size"`
MaxPartialLineSizeTruncate bool `mapstructure:"max_partial_line_size_truncate"`
+ MaxPartialLineAge time.Duration `mapstructure:"max_partial_line_age"`
}

// validateCriConfig validates the CriConfig for the cri stage
@@ -138,6 +158,9 @@ func validateCriConfig(cfg *CriConfig) error {
if cfg.MaxPartialLines == 0 {
cfg.MaxPartialLines = MaxPartialLinesSize
}
+ if cfg.MaxPartialLineAge == time.Duration(0) {
+ cfg.MaxPartialLineAge = DefaultMaxPartialLineAge
+ }
return nil
}

diff --git a/clients/pkg/logentry/stages/pipeline.go b/clients/pkg/logentry/stages/pipeline.go
index 288ea5190..a427b9015 100644
--- a/clients/pkg/logentry/stages/pipeline.go
+++ b/clients/pkg/logentry/stages/pipeline.go
@@ -3,6 +3,7 @@ package stages
import (
"context"
"sync"
+ "time"

"github.com/go-kit/log"
"github.com/pkg/errors"
@@ -98,7 +99,7 @@ func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Ent
return out
}

-// RunWithSkiporSendMany same as RunWithSkip, except it can either skip sending it to output channel, if `process` functions returns `skip` true. Or send many entries.
+// RunWithSkipOrSendMany same as RunWithSkip, except it can either skip sending it to output channel, if `process` functions returns `skip` true. Or send many entries.
func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, bool)) chan Entry {
out := make(chan Entry)
go func() {
@@ -117,6 +118,38 @@ func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, boo
return out
}

+// RunWithSkipOrSendManyWithTick same as RunWithSkipOrSendMany, except it can run `tick` function periodically.
+func RunWithSkipOrSendManyWithTick(input chan Entry, process func(e Entry) ([]Entry, bool), interval time.Duration, tick func() []Entry) chan Entry {
+ out := make(chan Entry)
+ go func() {
+ defer close(out)
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
+ for {
+ select {
+ case e, ok := <-input:
+ if !ok {
+ return
+ }
+ results, skip := process(e)
+ if skip {
+ continue
+ }
+ for _, result := range results {
+ out <- result
+ }
+ case <-ticker.C:
+ results := tick()
+ for _, result := range results {
+ out <- result
+ }
+ }
+ }
+ }()
+
+ return out
+}
+
// Run implements Stage
func (p *Pipeline) Run(in chan Entry) chan Entry {
in = RunWith(in, func(e Entry) Entry {
diff --git a/docs/sources/send-data/promtail/stages/cri.md b/docs/sources/send-data/promtail/stages/cri.md
index c78c92c80..4756a7f5a 100644
--- a/docs/sources/send-data/promtail/stages/cri.md
+++ b/docs/sources/send-data/promtail/stages/cri.md
@@ -23,6 +23,10 @@ cri:

# Allows to pretruncate partial lines before storing in partial buffer.
[max_partial_line_size_truncate: <bool> | default = false]
+
+ # The maximum duration of a partial line in memory.
+ # If a partial line is left unsent for longer than this, the line will be flushed as a complete line.
+ [max_partial_line_age: <duration> | default = 1m]
```

Unlike most stages, the `cri` stage provides no configuration options and only

0 comments on commit 0a8a5dd

Please sign in to comment.