Skip to content

Commit

Permalink
fix: make TSI index compact old and too-large log files (influxdata#2…
Browse files Browse the repository at this point in the history
…2334)

*  TSI index should compact old or too-large log files
* Old tsl files should be compacted without new writes
* Add extra logging when disk size test fails


Co-authored-by: Sam Arnold <[email protected]>
  • Loading branch information
danxmoran and lesam authored Aug 30, 2021
1 parent cc6accf commit 12fff64
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ This release adds an embedded SQLite database for storing metadata required by t
1. [22235](https://github.com/influxdata/influxdb/pull/22235): Avoid compaction queue stats flutter.
1. [22272](https://github.com/influxdata/influxdb/pull/22272): Requests to `/api/v2/authorizations` filter correctly on `org` and `user` parameters.
1. [22311](https://github.com/influxdata/influxdb/pull/22311): Enforce max field size while parsing line protocol.
1. [22334](https://github.com/influxdata/influxdb/pull/22334): Periodically compact old and large TSI files.

## v2.0.8 [2021-08-13]

Expand Down
15 changes: 15 additions & 0 deletions cmd/influxd/inspect/build_tsi/build_tsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,21 @@ func IndexShard(sfile *tsdb.SeriesFile, dataDir, walDir string, maxLogFileSize i
return err
}

log.Debug("Reopening TSI index with max-index-log-file-size=1 to fully compact log files")
compactingIndex := tsi1.NewIndex(sfile, "",
tsi1.WithPath(tmpPath),
tsi1.WithMaximumLogFileSize(1),
)
if err := compactingIndex.Open(); err != nil {
return err
}
compactingIndex.Compact()
compactingIndex.Wait()
log.Debug("re-closing tsi index")
if err := compactingIndex.Close(); err != nil {
return err
}

// Rename TSI to standard path.
log.Debug("Moving tsi to permanent location")
return os.Rename(tmpPath, indexPath)
Expand Down
22 changes: 22 additions & 0 deletions tsdb/index/tsi1/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,28 @@ import (
"github.com/influxdata/influxdb/v2/tsdb"
)

// This function is used to log the components of disk size when DiskSizeBytes fails
func (i *Index) LogDiskSize(t *testing.T) {
fs, err := i.RetainFileSet()
if err != nil {
t.Log("could not retain fileset")
}
defer fs.Release()
var size int64
// Get MANIFEST sizes from each partition.
for count, p := range i.partitions {
sz := p.manifestSize
t.Logf("Parition %d has size %d", count, sz)
size += sz
}
for _, f := range fs.files {
sz := f.Size()
t.Logf("Size of file %s is %d", f.Path(), sz)
size += sz
}
t.Logf("Total size is %d", size)
}

func TestTagValueSeriesIDCache(t *testing.T) {
m0k0v0 := tsdb.NewSeriesIDSet(1, 2, 3, 4, 5)
m0k0v1 := tsdb.NewSeriesIDSet(10, 20, 30, 40, 50)
Expand Down
24 changes: 18 additions & 6 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/cespare/xxhash"
Expand Down Expand Up @@ -42,6 +43,7 @@ func init() {
idx := NewIndex(sfile, db,
WithPath(path),
WithMaximumLogFileSize(int64(opt.Config.MaxIndexLogFileSize)),
WithMaximumLogFileAge(time.Duration(opt.Config.CompactFullWriteColdDuration)),
WithSeriesIDCacheSize(opt.Config.SeriesIDSetCacheSize),
)
return idx
Expand Down Expand Up @@ -88,6 +90,12 @@ var WithMaximumLogFileSize = func(size int64) IndexOption {
}
}

var WithMaximumLogFileAge = func(dur time.Duration) IndexOption {
return func(i *Index) {
i.maxLogFileAge = dur
}
}

// DisableFsync disables flushing and syncing of underlying files. Primarily this
// impacts the LogFiles. This option can be set when working with the index in
// an offline manner, for cases where a hard failure can be overcome by re-running the tooling.
Expand Down Expand Up @@ -130,12 +138,13 @@ type Index struct {
tagValueCacheSize int

// The following may be set when initializing an Index.
path string // Root directory of the index partitions.
disableCompactions bool // Initially disables compactions on the index.
maxLogFileSize int64 // Maximum size of a LogFile before it's compacted.
logfileBufferSize int // The size of the buffer used by the LogFile.
disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline.
logger *zap.Logger // Index's logger.
path string // Root directory of the index partitions.
disableCompactions bool // Initially disables compactions on the index.
maxLogFileSize int64 // Maximum size of a LogFile before it's compacted.
maxLogFileAge time.Duration // Maximum age of a LogFile before it's compacted.
logfileBufferSize int // The size of the buffer used by the LogFile.
disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline.
logger *zap.Logger // Index's logger.

// The following must be set when initializing an Index.
sfile *tsdb.SeriesFile // series lookup file
Expand All @@ -161,6 +170,7 @@ func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *
idx := &Index{
tagValueCacheSize: tsdb.DefaultSeriesIDSetCacheSize,
maxLogFileSize: tsdb.DefaultMaxIndexLogFileSize,
maxLogFileAge: tsdb.DefaultCompactFullWriteColdDuration,
logger: zap.NewNop(),
version: Version,
sfile: sfile,
Expand Down Expand Up @@ -193,6 +203,7 @@ func (i *Index) Bytes() int {
b += int(unsafe.Sizeof(i.path)) + len(i.path)
b += int(unsafe.Sizeof(i.disableCompactions))
b += int(unsafe.Sizeof(i.maxLogFileSize))
b += int(unsafe.Sizeof(i.maxLogFileAge))
b += int(unsafe.Sizeof(i.logger))
b += int(unsafe.Sizeof(i.sfile))
// Do not count SeriesFile because it belongs to the code that constructed this Index.
Expand Down Expand Up @@ -259,6 +270,7 @@ func (i *Index) Open() error {
for j := 0; j < len(i.partitions); j++ {
p := NewPartition(i.sfile, filepath.Join(i.path, fmt.Sprint(j)))
p.MaxLogFileSize = i.maxLogFileSize
p.MaxLogFileAge = i.maxLogFileAge
p.nosync = i.disableFsync
p.logbufferSize = i.logfileBufferSize
p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1)))
Expand Down
86 changes: 66 additions & 20 deletions tsdb/index/tsi1/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sort"
"sync"
"testing"
"time"

"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb"
Expand Down Expand Up @@ -222,6 +223,13 @@ func TestIndex_Open(t *testing.T) {
t.Fatalf("got index version %d, expected %d", got, exp)
}
}

for i := 0; i < int(idx.PartitionN); i++ {
p := idx.PartitionAt(i)
if got, exp := p.NeedsCompaction(), false; got != exp {
t.Fatalf("got needs compaction %v, expected %v", got, exp)
}
}
})

// Reopening an open index should return an error.
Expand Down Expand Up @@ -298,15 +306,25 @@ func TestIndex_DiskSizeBytes(t *testing.T) {
t.Fatal(err)
}

// Verify on disk size is the same in each stage.
// Each series stores flag(1) + series(uvarint(2)) + len(name)(1) + len(key)(1) + len(value)(1) + checksum(4).
expSize := int64(4 * 9)

// Each MANIFEST file is 419 bytes and there are tsi1.DefaultPartitionN of them
expSize += int64(tsi1.DefaultPartitionN * 419)
idx.RunStateAware(t, func(t *testing.T, state int) {
// Each MANIFEST file is 419 bytes and there are tsi1.DefaultPartitionN of them
expSize := int64(tsi1.DefaultPartitionN * 419)
switch state {
case Initial:
fallthrough
case Reopen:
// In the log file, each series stores flag(1) + series(uvarint(2)) + len(name)(1) + len(key)(1) + len(value)(1) + checksum(4).
expSize += 4 * 9
case PostCompaction:
fallthrough
case PostCompactionReopen:
// For TSI files after a compaction, instead of 4*9, we have encoded measurement names, tag names, etc which is larger
expSize += 2202
}

idx.Run(t, func(t *testing.T) {
if got, exp := idx.DiskSizeBytes(), expSize; got != exp {
// We had some odd errors - if the size is unexpected, log it
idx.Index.LogDiskSize(t)
t.Fatalf("got %d bytes, expected %d", got, exp)
}
})
Expand Down Expand Up @@ -517,7 +535,7 @@ func (idx *Index) Close() error {
}

// Reopen closes and opens the index.
func (idx *Index) Reopen() error {
func (idx *Index) Reopen(maxLogSize int64) error {
if err := idx.Index.Close(); err != nil {
return err
}
Expand All @@ -529,11 +547,24 @@ func (idx *Index) Reopen() error {
}

partitionN := idx.Index.PartitionN // Remember how many partitions to use.
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path()))
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path()), tsi1.WithMaximumLogFileSize(maxLogSize))
idx.Index.PartitionN = partitionN
return idx.Open()
}

const (
Initial = iota
Reopen
PostCompaction
PostCompactionReopen
)

func curryState(state int, f func(t *testing.T, state int)) func(t *testing.T) {
return func(t *testing.T) {
f(t, state)
}
}

// Run executes a subtest for each of several different states:
//
// - Immediately
Expand All @@ -544,27 +575,42 @@ func (idx *Index) Reopen() error {
// The index should always respond in the same fashion regardless of
// how data is stored. This helper allows the index to be easily tested
// in all major states.
func (idx *Index) Run(t *testing.T, fn func(t *testing.T)) {
func (idx *Index) RunStateAware(t *testing.T, fn func(t *testing.T, state int)) {
// Invoke immediately.
t.Run("state=initial", fn)
t.Run("state=initial", curryState(Initial, fn))

// Reopen and invoke again.
if err := idx.Reopen(); err != nil {
if err := idx.Reopen(tsdb.DefaultMaxIndexLogFileSize); err != nil {
t.Fatalf("reopen error: %s", err)
}
t.Run("state=reopen", fn)
t.Run("state=reopen", curryState(Reopen, fn))

// TODO: Request a compaction.
// if err := idx.Compact(); err != nil {
// t.Fatalf("compact error: %s", err)
// }
// t.Run("state=post-compaction", fn)
// Reopen requiring a full compaction of the TSL files and invoke again.
idx.Reopen(1)
for {
needsCompaction := false
for i := 0; i < int(idx.PartitionN); i++ {
needsCompaction = needsCompaction || idx.PartitionAt(i).NeedsCompaction()
}
if !needsCompaction {
break
}
time.Sleep(10 * time.Millisecond)
}
t.Run("state=post-compaction", curryState(PostCompaction, fn))

// Reopen and invoke again.
if err := idx.Reopen(); err != nil {
if err := idx.Reopen(tsdb.DefaultMaxIndexLogFileSize); err != nil {
t.Fatalf("post-compaction reopen error: %s", err)
}
t.Run("state=post-compaction-reopen", fn)
t.Run("state=post-compaction-reopen", curryState(PostCompactionReopen, fn))
}

// Run is the same is RunStateAware but for tests that do not depend on compaction state
func (idx *Index) Run(t *testing.T, fn func(t *testing.T)) {
idx.RunStateAware(t, func(t *testing.T, _ int) {
fn(t)
})
}

// CreateSeriesSliceIfNotExists creates multiple series at a time.
Expand Down
9 changes: 9 additions & 0 deletions tsdb/index/tsi1/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func NewLogFile(sfile *tsdb.SeriesFile, path string) *LogFile {

// bytes estimates the memory footprint of this LogFile, in bytes.
func (f *LogFile) bytes() int {
f.mu.RLock()
defer f.mu.RUnlock()
var b int
b += 24 // mu RWMutex is 24 bytes
b += 16 // wg WaitGroup is 16 bytes
Expand Down Expand Up @@ -263,6 +265,13 @@ func (f *LogFile) Size() int64 {
return v
}

// ModTime returns the last modified time of the file
func (f *LogFile) ModTime() time.Time {
f.mu.RLock()
defer f.mu.RUnlock()
return f.modTime
}

// Measurement returns a measurement element.
func (f *LogFile) Measurement(name []byte) MeasurementElem {
f.mu.RLock()
Expand Down
Loading

0 comments on commit 12fff64

Please sign in to comment.