diff --git a/CHANGELOG.md b/CHANGELOG.md index 48cccdfb077..61b60d5d608 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ This release adds an embedded SQLite database for storing metadata required by t 1. [21786](https://github.com/influxdata/influxdb/pull/21786): Ported the `influxd inspect deletetsm` command from 1.x. 1. [21802](https://github.com/influxdata/influxdb/pull/21802): Removed unused `chronograf-migator` package & chronograf API service, and updated various "chronograf" references. 1. [21828](https://github.com/influxdata/influxdb/pull/21828): Added the command `influx inspect verify-wal`. +1. [21814](https://github.com/influxdata/influxdb/pull/21814): Ported the `influxd inspect report-tsm` command from 1.x. ### Bug Fixes diff --git a/cmd/influxd/inspect/inspect.go b/cmd/influxd/inspect/inspect.go index 3a43a4a9782..43fe941ca90 100644 --- a/cmd/influxd/inspect/inspect.go +++ b/cmd/influxd/inspect/inspect.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_index" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_lp" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsi" + "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsm" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_seriesfile" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tombstone" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tsm" @@ -40,6 +41,7 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) { base.AddCommand(dump_tsi.NewDumpTSICommand()) base.AddCommand(delete_tsm.NewDeleteTSMCommand()) base.AddCommand(verify_wal.NewVerifyWALCommand()) + base.AddCommand(report_tsm.NewReportTSMCommand()) return base, nil } diff --git a/cmd/influxd/inspect/report_tsm/report_tsm.go b/cmd/influxd/inspect/report_tsm/report_tsm.go new file mode 100644 index 00000000000..402151ef9d8 --- /dev/null +++ b/cmd/influxd/inspect/report_tsm/report_tsm.go @@ -0,0 +1,366 @@ +package report_tsm + +import ( + "errors" + "fmt" + "math" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "text/tabwriter" + "time" + + "github.com/influxdata/influxdb/v2/internal/fs" + "github.com/influxdata/influxdb/v2/models" + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/retailnext/hllpp" + "github.com/spf13/cobra" +) + +type args struct { + dir string + pattern string + detailed bool + exact bool +} + +func NewReportTSMCommand() *cobra.Command { + var arguments args + cmd := &cobra.Command{ + Use: "report-tsm", + Short: "Run TSM report", + Long: ` +This command will analyze TSM files within a storage engine directory, reporting +the cardinality within the files as well as the time range that the point data +covers. +This command only interrogates the index within each file, and does not read any +block data. To reduce heap requirements, by default report-tsm estimates the +overall cardinality in the file set by using the HLL++ algorithm. Exact +cardinalities can be determined by using the --exact flag. +For each file, the following is output: + * The full filename; + * The series cardinality within the file; + * The number of series first encountered within the file; + * The min and max timestamp associated with TSM data in the file; and + * The time taken to load the TSM index and apply any tombstones. +The summary section then outputs the total time range and series cardinality for +the fileset. Depending on the --detailed flag, series cardinality is segmented +in the following ways: + * Series cardinality for each organization; + * Series cardinality for each bucket; + * Series cardinality for each measurement; + * Number of field keys for each measurement; and + * Number of tag values for each tag key.`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + + // Verify if shard dir + err := arguments.isShardDir(arguments.dir) + if arguments.detailed && err != nil { + return errors.New("--detailed only supported for shard dirs") + } + + return arguments.Run(cmd) + }, + } + + cmd.Flags().StringVarP(&arguments.pattern, "pattern", "", "", "only process TSM files containing pattern") + cmd.Flags().BoolVarP(&arguments.exact, "exact", "", false, "calculate and exact cardinality count. Warning, may use significant memory...") + cmd.Flags().BoolVarP(&arguments.detailed, "detailed", "", false, "emit series cardinality segmented by measurements, tag keys and fields. Warning, may take a while.") + + dir, err := fs.InfluxDir() + if err != nil { + panic(err) + } + dir = filepath.Join(dir, "engine/data") + cmd.Flags().StringVarP(&arguments.dir, "data-dir", "", dir, "use provided data directory") + + return cmd +} + +func (a *args) isShardDir(dir string) error { + name := filepath.Base(dir) + if id, err := strconv.Atoi(name); err != nil || id < 1 { + return fmt.Errorf("not a valid shard dir: %s", dir) + } + + return nil +} + +func (a *args) Run(cmd *cobra.Command) error { + // Create the cardinality counter + newCounterFn := newHLLCounter + estTitle := " (est)" + if a.exact { + estTitle = "" + newCounterFn = newExactCounter + } + + totalSeries := newCounterFn() + tagCardinalities := map[string]counter{} + measCardinalities := map[string]counter{} + fieldCardinalities := map[string]counter{} + + dbCardinalities := map[string]counter{} + + start := time.Now() + + tw := tabwriter.NewWriter(cmd.OutOrStdout(), 8, 2, 1, ' ', 0) + _, _ = fmt.Fprintln(tw, strings.Join([]string{"DB", "RP", "Shard", "File", "Series", "New" + estTitle, "Min Time", "Max Time", "Load Time"}, "\t")) + + minTime, maxTime := int64(math.MaxInt64), int64(math.MinInt64) + var fileCount int + if err := a.walkShardDirs(a.dir, func(db, rp, id, path string) error { + if a.pattern != "" && !strings.Contains(path, a.pattern) { + return nil + } + + file, err := os.OpenFile(path, os.O_RDONLY, 0600) + if err != nil { + _, _ = fmt.Fprintf(cmd.ErrOrStderr(), "error opening %q, skipping: %v\n", path, err) + return nil + } + + loadStart := time.Now() + reader, err := tsm1.NewTSMReader(file) + if err != nil { + _, _ = fmt.Fprintf(cmd.ErrOrStderr(), "error reading %q, skipping: %v\n", file.Name(), err) + return nil + } + loadTime := time.Since(loadStart) + fileCount++ + + dbCount, ok := dbCardinalities[db] + if !ok { + dbCount = newCounterFn() + dbCardinalities[db] = dbCount + } + + oldCount := dbCount.Count() + + seriesCount := reader.KeyCount() + for i := 0; i < seriesCount; i++ { + key, _ := reader.KeyAt(i) + totalSeries.Add(key) + dbCount.Add(key) + + if a.detailed { + sep := strings.Index(string(key), "#!~#") + seriesKey, field := key[:sep], key[sep+4:] + measurement, tags := models.ParseKey(seriesKey) + + measCount, ok := measCardinalities[measurement] + if !ok { + measCount = newCounterFn() + measCardinalities[measurement] = measCount + } + measCount.Add(key) + + fieldCount, ok := fieldCardinalities[measurement] + if !ok { + fieldCount = newCounterFn() + fieldCardinalities[measurement] = fieldCount + } + fieldCount.Add(field) + + for _, t := range tags { + tagCount, ok := tagCardinalities[string(t.Key)] + if !ok { + tagCount = newCounterFn() + tagCardinalities[string(t.Key)] = tagCount + } + tagCount.Add(t.Value) + } + } + } + minT, maxT := reader.TimeRange() + if minT < minTime { + minTime = minT + } + if maxT > maxTime { + maxTime = maxT + } + err = reader.Close() + if err != nil { + return fmt.Errorf("failed to close TSM Reader: %v", err) + } + + _, _ = fmt.Fprintln(tw, strings.Join([]string{ + db, rp, id, + filepath.Base(file.Name()), + strconv.FormatInt(int64(seriesCount), 10), + strconv.FormatInt(int64(dbCount.Count()-oldCount), 10), + time.Unix(0, minT).UTC().Format(time.RFC3339Nano), + time.Unix(0, maxT).UTC().Format(time.RFC3339Nano), + loadTime.String(), + }, "\t")) + if a.detailed { + err = tw.Flush() + if err != nil { + return fmt.Errorf("failed to flush tabwriter: %v", err) + } + } + return nil + }); err != nil { + return err + } + + err := tw.Flush() + if err != nil { + return fmt.Errorf("failed to flush tabwriter: %v", err) + } + + printSummary(cmd, printArgs{ + fileCount: fileCount, + minTime: minTime, + maxTime: maxTime, + estTitle: estTitle, + totalSeries: totalSeries, + detailed: a.detailed, + tagCardinalities: tagCardinalities, + measCardinalities: measCardinalities, + fieldCardinalities: fieldCardinalities, + dbCardinalities: dbCardinalities, + }) + + cmd.Printf("Completed in %s\n", time.Since(start)) + return nil +} + +type printArgs struct { + fileCount int + minTime, maxTime int64 + estTitle string + totalSeries counter + detailed bool + + tagCardinalities map[string]counter + measCardinalities map[string]counter + fieldCardinalities map[string]counter + dbCardinalities map[string]counter +} + +func printSummary(cmd *cobra.Command, p printArgs) { + cmd.Printf("\nSummary:") + cmd.Printf(" Files: %d\n", p.fileCount) + cmd.Printf(" Time Range: %s - %s\n", + time.Unix(0, p.minTime).UTC().Format(time.RFC3339Nano), + time.Unix(0, p.maxTime).UTC().Format(time.RFC3339Nano), + ) + cmd.Printf(" Duration: %s \n\n", time.Unix(0, p.maxTime).Sub(time.Unix(0, p.minTime))) + + cmd.Printf("Statistics\n") + cmd.Printf(" Series:\n") + for db, counts := range p.dbCardinalities { + cmd.Printf(" - %s%s: %d (%d%%)\n", db, p.estTitle, counts.Count(), int(float64(counts.Count())/float64(p.totalSeries.Count())*100)) + } + cmd.Printf(" Total%s: %d\n", p.estTitle, p.totalSeries.Count()) + + if p.detailed { + cmd.Printf("\n Measurements (est):\n") + for _, t := range sortKeys(p.measCardinalities) { + cmd.Printf(" - %v: %d (%d%%)\n", t, p.measCardinalities[t].Count(), int((float64(p.measCardinalities[t].Count())/float64(p.totalSeries.Count()))*100)) + } + + cmd.Printf("\n Fields (est):\n") + for _, t := range sortKeys(p.fieldCardinalities) { + cmd.Printf(" - %v: %d\n", t, p.fieldCardinalities[t].Count()) + } + + cmd.Printf("\n Tags (est):\n") + for _, t := range sortKeys(p.tagCardinalities) { + cmd.Printf(" - %v: %d\n", t, p.tagCardinalities[t].Count()) + } + } +} + +// sortKeys is a quick helper to return the sorted set of a map's keys +func sortKeys(vals map[string]counter) (keys []string) { + for k := range vals { + keys = append(keys, k) + } + sort.Strings(keys) + + return keys +} + +func (a *args) walkShardDirs(root string, fn func(db, rp, id, path string) error) error { + type location struct { + db, rp, id, path string + } + + var tsms []location + if err := filepath.WalkDir(root, func(path string, info os.DirEntry, err error) error { + if err != nil { + return err + } + + if info.IsDir() { + return nil + } + + if filepath.Ext(info.Name()) == "."+tsm1.TSMFileExtension { + shardDir := filepath.Dir(path) + + if err := a.isShardDir(shardDir); err != nil { + return err + } + absPath, err := filepath.Abs(path) + if err != nil { + return err + } + parts := strings.Split(absPath, string(filepath.Separator)) + db, rp, id := parts[len(parts)-4], parts[len(parts)-3], parts[len(parts)-2] + tsms = append(tsms, location{db: db, rp: rp, id: id, path: path}) + return nil + } + return nil + }); err != nil { + return err + } + + sort.Slice(tsms, func(i, j int) bool { + a, _ := strconv.Atoi(tsms[i].id) + b, _ := strconv.Atoi(tsms[j].id) + return a < b + }) + + for _, shard := range tsms { + if err := fn(shard.db, shard.rp, shard.id, shard.path); err != nil { + return err + } + } + return nil +} + +// counter abstracts a a method of counting keys. +type counter interface { + Add(key []byte) + Count() uint64 +} + +// newHLLCounter returns an approximate counter using HyperLogLogs for cardinality estimation. +func newHLLCounter() counter { + return hllpp.New() +} + +// exactCounter returns an exact count for keys using counting all distinct items in a set. +type exactCounter struct { + m map[string]struct{} +} + +func (c *exactCounter) Add(key []byte) { + c.m[string(key)] = struct{}{} +} + +func (c *exactCounter) Count() uint64 { + return uint64(len(c.m)) +} + +func newExactCounter() counter { + return &exactCounter{ + m: make(map[string]struct{}), + } +} diff --git a/cmd/influxd/inspect/report_tsm/report_tsm_test.go b/cmd/influxd/inspect/report_tsm/report_tsm_test.go new file mode 100644 index 00000000000..fd25b7c79e6 --- /dev/null +++ b/cmd/influxd/inspect/report_tsm/report_tsm_test.go @@ -0,0 +1,247 @@ +package report_tsm + +import ( + "bytes" + "encoding/binary" + "io" + "os" + "strconv" + "testing" + + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/stretchr/testify/require" +) + +func Test_Invalid_NotDir(t *testing.T) { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + file, err := os.CreateTemp(dir, "") + require.NoError(t, err) + defer os.RemoveAll(dir) + + runCommand(t, testInfo{ + dir: file.Name(), + expectOut: []string{"Files: 0"}, + }) +} + +func Test_Invalid_EmptyDir(t *testing.T) { + var info dirInfo + dir := makeTempDir(t, "", info) + defer os.RemoveAll(dir) + + runCommand(t, testInfo{ + dir: dir, + expectOut: []string{"Files: 0"}, + }) +} + +func Test_Invalid_NotTSMFile(t *testing.T) { + info := dirInfo{ + numFiles: 1, + tsm: tsmInfo{ + withFile: true, + emptyTSM: true, + }, + } + dir := makeTempDir(t, "", info) + defer os.RemoveAll(dir) + + runCommand(t, testInfo{ + dir: dir, + expectOut: []string{"Files: 0"}, + }) +} + +func Test_Invalid_EmptyFile(t *testing.T) { + info := dirInfo{ + numFiles: 1, + tsm: tsmInfo{ + withTSMFile: true, + emptyTSM: true, + }, + } + dir := makeTempDir(t, "", info) + defer os.RemoveAll(dir) + + runCommand(t, testInfo{ + dir: dir, + expectOut: []string{"error reading magic number of file"}, + }) +} + +func Test_Invalid_BadFile(t *testing.T) { + info := dirInfo{ + numFiles: 1, + tsm: tsmInfo{ + withTSMFile: true, + invalidTSM: true, + }, + } + dir := makeTempDir(t, "", info) + defer os.RemoveAll(dir) + + runCommand(t, testInfo{ + dir: dir, + expectOut: []string{"can only read from tsm file"}, + }) +} + +func Test_Invalid_BadFile_WithGoodFiles(t *testing.T) { + info := dirInfo{ + numFiles: 3, + tsm: tsmInfo{ + withTSMFile: true, + invalidTSM: true, + }, + } + dir := makeTempDir(t, "", info) + defer os.RemoveAll(dir) + + runCommand(t, testInfo{ + dir: dir, + expectOut: []string{ + "can only read from tsm file", // bad file + "Files: 2", // 2 other good files + }, + }) +} + +func Test_Valid_SingleFile(t *testing.T) { + info := dirInfo{ + numFiles: 1, + tsm: tsmInfo{ + withTSMFile: true, + }, + } + dir := makeTempDir(t, "", info) + + runCommand(t, testInfo{ + dir: dir, + expectOut: []string{"Files: 1"}, + }) +} + +func Test_Valid_MultipleFiles_SingleDir(t *testing.T) { + info := dirInfo{ + numFiles: 3, + tsm: tsmInfo{ + withTSMFile: true, + }, + } + dir := makeTempDir(t, "", info) + defer os.RemoveAll(dir) + + runCommand(t, testInfo{ + dir: dir, + expectOut: []string{"Files: 3"}, + }) +} + +func Test_Valid_MultipleFiles_MultipleDirs(t *testing.T) { + info := dirInfo{ + numFiles: 3, + subDirs: 3, + tsm: tsmInfo{ + withTSMFile: true, + }, + } + dir := makeTempDir(t, "", info) + defer os.RemoveAll(dir) + + runCommand(t, testInfo{ + dir: dir, + expectOut: []string{"Files: 12"}, + }) +} + +type dirInfo struct { + tsm tsmInfo + numFiles int + subDirs int + + subDirIndex int // Used for recursion only +} + +type tsmInfo struct { + withFile bool + withTSMFile bool + + emptyTSM bool + invalidTSM bool +} + +type testInfo struct { + dir string + expectOut []string +} + +func runCommand(t *testing.T, info testInfo) { + cmd := NewReportTSMCommand() + cmd.SetArgs([]string{"--data-dir", info.dir}) + + b := bytes.NewBufferString("") + cmd.SetOut(b) + cmd.SetErr(b) + + require.NoError(t, cmd.Execute()) + + out, err := io.ReadAll(b) + require.NoError(t, err) + for _, entry := range info.expectOut { + require.Contains(t, string(out), entry) + } +} + +// makeTempDir returns the path to the root temporary directory +func makeTempDir(t *testing.T, startDir string, info dirInfo) string { + t.Helper() + + dir, err := os.MkdirTemp(startDir, strconv.Itoa(info.subDirIndex)) + require.NoError(t, err) + + // Make subdirectories + if info.subDirIndex == 0 { + for i := 0; i < info.subDirs; i++ { + info.subDirIndex += 1 + makeTempDir(t, dir, info) + } + } + + // Make TSM files + for i := 0; i < info.numFiles; i++ { + makeTempTSM(t, dir, info.tsm) + info.tsm.invalidTSM = false // only do 1 max invalid TSM file, as the tests desire + } + return dir +} + +func makeTempTSM(t *testing.T, dir string, info tsmInfo) { + t.Helper() + + if info.withFile || info.withTSMFile { + var ext string + if info.withTSMFile { + ext = tsm1.TSMFileExtension + } else { + ext = "txt" + } + file, err := os.CreateTemp(dir, "reporttsm*."+ext) + require.NoError(t, err) + + if !info.emptyTSM { + w, err := tsm1.NewTSMWriter(file) + require.NoError(t, err) + defer w.Close() + + values := []tsm1.Value{tsm1.NewValue(0, 1.0)} + require.NoError(t, w.Write([]byte("cpu"), values)) + + if info.invalidTSM { + require.NoError(t, binary.Write(file, binary.BigEndian, []byte("foobar\n"))) + } + + require.NoError(t, w.WriteIndex()) + } + } +} diff --git a/go.mod b/go.mod index 95383e81d36..677da74a641 100644 --- a/go.mod +++ b/go.mod @@ -69,6 +69,7 @@ require ( github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 + github.com/retailnext/hllpp v1.0.0 github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b github.com/spf13/cast v1.3.0 github.com/spf13/cobra v1.0.0 diff --git a/go.sum b/go.sum index 38a2679d2a5..dd3d301ff15 100644 --- a/go.sum +++ b/go.sum @@ -517,6 +517,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/retailnext/hllpp v1.0.0 h1:7+NffI2mo7lZG78NruEsf3jEnjJ6Z0n1otEyFqdK8zA= +github.com/retailnext/hllpp v1.0.0/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=