From 5d8c0b4b1768a866e7a17030628dec443c5da241 Mon Sep 17 00:00:00 2001 From: mcfarlm3 <58636946+mcfarlm3@users.noreply.github.com> Date: Mon, 19 Jul 2021 08:37:28 -0700 Subject: [PATCH] feat: port `influxd inspect report-tsi` to 2.x (#21788) * feat: port influxd inspect report-tsi * chore: restored tsi1.IsIndexDir() functionality * refactor: changed inspect report-tsi to work with 2.x file system structure * chore: fixed output tab alignment * chore: ran make fmt * chore: ran make fmt on changed file after resolving merge conflicts * refactor: made changes based on code review * refactor: avoiding defer build-up in loop * refactor: replaced path with filepath and added shardIdx closures * chore: updated changelog * refactor: handled shardIdx closures in error case Co-authored-by: Michelle McFarland --- CHANGELOG.md | 1 + cmd/influxd/inspect/inspect.go | 2 + cmd/influxd/inspect/report_tsi/report_tsi.go | 458 +++++++++++++++++++ 3 files changed, 461 insertions(+) create mode 100644 cmd/influxd/inspect/report_tsi/report_tsi.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ce5efd2781..48cccdfb077 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ This release adds an embedded SQLite database for storing metadata required by t 1. [21615](https://github.com/influxdata/influxdb/pull/21615): Ported the `influxd inspect verify-tsm` command from 1.x. 1. [21646](https://github.com/influxdata/influxdb/pull/21646): Ported the `influxd inspect verify-tombstone` command from 1.x. 1. [21761](https://github.com/influxdata/influxdb/pull/21761): Ported the `influxd inspect dump-tsm` command from 1.x. +1. [21788](https://github.com/influxdata/influxdb/pull/21788): Ported the `influxd inspect report-tsi` command from 1.x. 1. [21784](https://github.com/influxdata/influxdb/pull/21784): Ported the `influxd inspect dumptsi` command from 1.x. 1. [21786](https://github.com/influxdata/influxdb/pull/21786): Ported the `influxd inspect deletetsm` command from 1.x. 1. [21802](https://github.com/influxdata/influxdb/pull/21802): Removed unused `chronograf-migator` package & chronograf API service, and updated various "chronograf" references. diff --git a/cmd/influxd/inspect/inspect.go b/cmd/influxd/inspect/inspect.go index 1ec61600300..3a43a4a9782 100644 --- a/cmd/influxd/inspect/inspect.go +++ b/cmd/influxd/inspect/inspect.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/dump_tsm" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_index" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_lp" + "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsi" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_seriesfile" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tombstone" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tsm" @@ -30,6 +31,7 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) { return nil, err } base.AddCommand(exportLp) + base.AddCommand(report_tsi.NewReportTSICommand()) base.AddCommand(export_index.NewExportIndexCommand()) base.AddCommand(verify_tsm.NewTSMVerifyCommand()) base.AddCommand(verify_seriesfile.NewVerifySeriesfileCommand()) diff --git a/cmd/influxd/inspect/report_tsi/report_tsi.go b/cmd/influxd/inspect/report_tsi/report_tsi.go new file mode 100644 index 00000000000..3dbe9407fee --- /dev/null +++ b/cmd/influxd/inspect/report_tsi/report_tsi.go @@ -0,0 +1,458 @@ +// Package report_tsi provides a report about the series cardinality in one or more TSI indexes. +package report_tsi + +import ( + "errors" + "fmt" + "math" + "os" + "path/filepath" + "runtime" + "sort" + "strconv" + "sync/atomic" + "text/tabwriter" + + "github.com/influxdata/influxdb/v2/logger" + "github.com/influxdata/influxdb/v2/tsdb" + "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" + "github.com/spf13/cobra" +) + +const ( + // Number of series IDs to stored in slice before we convert to a roaring + // bitmap. Roaring bitmaps have a non-trivial initial cost to construct. + useBitmapN = 25 +) + +// reportTSI represents the program execution for "inspect report-tsi". +type reportTSI struct { + // Flags + bucketId string // required + enginePath string + topN int + concurrency int + + // Variables for calculating and storing cardinalities + sfile *tsdb.SeriesFile + shardPaths map[uint64]string + shardIdxs map[uint64]*tsi1.Index + cardinalities map[uint64]map[string]*cardinality +} + +// NewReportTSICommand returns a new instance of Command with default setting applied. +func NewReportTSICommand() *cobra.Command { + var arguments reportTSI + cmd := &cobra.Command{ + Use: "report-tsi", + Short: "Reports the cardinality of TSI files", + Long: `This command will analyze TSI files within a specified bucket, reporting the +cardinality of data within the files, segmented by shard and further by measurement.`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + + arguments.shardPaths = map[uint64]string{} + arguments.shardIdxs = map[uint64]*tsi1.Index{} + arguments.cardinalities = map[uint64]map[string]*cardinality{} + + return arguments.run() + }, + } + + cmd.Flags().StringVarP(&arguments.bucketId, "bucket-id", "b", "", "Required - specify which bucket to report on. A bucket id must be a base-16 string") + cmd.Flags().StringVar(&arguments.enginePath, "path", os.Getenv("HOME")+"/.influxdbv2/engine/data", "Path to engine.") + cmd.Flags().IntVarP(&arguments.topN, "top", "t", 0, "Limit results to top n") + cmd.Flags().IntVar(&arguments.concurrency, "c", runtime.GOMAXPROCS(0), "How many concurrent workers to run.") + cmd.MarkFlagRequired("bucket-id") + + return cmd +} + +// Run executes the command. +func (report *reportTSI) run() error { + // Get all shards from specified bucket + dirEntries, err := os.ReadDir(filepath.Join(report.enginePath, report.bucketId, "autogen")) + + if err != nil { + return err + } + + for _, entry := range dirEntries { + + if !entry.IsDir() { + continue + } + + if entry.Name() == tsdb.SeriesFileDirectory || entry.Name() == "index" { + continue + } + + id, err := strconv.Atoi(entry.Name()) + if err != nil { + continue + } + + report.shardPaths[uint64(id)] = filepath.Join(report.enginePath, report.bucketId, "autogen", entry.Name()) + } + + if len(report.shardPaths) == 0 { + fmt.Fprintf(os.Stderr, "No shards under %s\n", report.enginePath) + return nil + } + + report.sfile = tsdb.NewSeriesFile(filepath.Join(report.enginePath, report.bucketId, tsdb.SeriesFileDirectory)) + + config := logger.NewConfig() + newLogger, err := config.New(os.Stderr) + if err != nil { + return err + } + report.sfile.Logger = newLogger + + if err := report.sfile.Open(); err != nil { + return err + } + defer report.sfile.Close() + + // Blocks until all work done. + report.calculateCardinalities(report.cardinalityByMeasurement) + + allIDs := make([]uint64, 0, len(report.shardIdxs)) + + for id := range report.shardIdxs { + allIDs = append(allIDs, id) + } + + // Print summary. + err = report.printSummaryByMeasurement() + + for id := range report.shardIdxs { + report.shardIdxs[id].Close() + } + + if err != nil { + return err + } + + sort.Slice(allIDs, func(i int, j int) bool { return allIDs[i] < allIDs[j] }) + + for _, id := range allIDs { + if err := report.printShardByMeasurement(id); err != nil { + return err + } + } + return nil +} + +// calculateCardinalities calculates the cardinalities of the set of shard being +// worked on concurrently. The provided function determines how cardinality is +// calculated and broken down. +func (report *reportTSI) calculateCardinalities(fn func(id uint64) error) error { + // Get list of shards to work on. + shardIDs := make([]uint64, 0, len(report.shardPaths)) + for id := range report.shardPaths { + pth := filepath.Join(report.shardPaths[id], "index") + + report.shardIdxs[id] = tsi1.NewIndex(report.sfile, + "", + tsi1.WithPath(pth), + tsi1.DisableCompactions(), + ) + + // Initialise cardinality set to store cardinalities for each shard + report.cardinalities[id] = map[string]*cardinality{} + + shardIDs = append(shardIDs, id) + } + + errC := make(chan error, len(shardIDs)) + var maxi uint32 // index of maximum shard being worked on. + for k := 0; k < report.concurrency; k++ { + go func() { + for { + i := int(atomic.AddUint32(&maxi, 1) - 1) // Get next shard to work on. + if i >= len(shardIDs) { + return // No more work. + } + errC <- fn(shardIDs[i]) + } + }() + } + + // Check for error + for i := 0; i < cap(errC); i++ { + if err := <-errC; err != nil { + return err + } + } + + return nil +} + +// Cardinality struct and methods +type cardinality struct { + name []byte + short []uint32 + set *tsdb.SeriesIDSet +} + +func (c *cardinality) add(x uint64) { + if c.set != nil { + c.set.AddNoLock(x) + return + } + + c.short = append(c.short, uint32(x)) // Series IDs never get beyond 2^32 + + // Cheaper to store in bitmap. + if len(c.short) > useBitmapN { + c.set = tsdb.NewSeriesIDSet() + for _, s := range c.short { + c.set.AddNoLock(uint64(s)) + } + c.short = nil + return + } +} + +func (c *cardinality) cardinality() int64 { + if c == nil || (c.short == nil && c.set == nil) { + return 0 + } + + if c.short != nil { + return int64(len(c.short)) + } + return int64(c.set.Cardinality()) +} + +type cardinalities []*cardinality + +func (a cardinalities) Len() int { return len(a) } +func (a cardinalities) Less(i, j int) bool { return a[i].cardinality() < a[j].cardinality() } +func (a cardinalities) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func (report *reportTSI) cardinalityByMeasurement(shardID uint64) error { + if err := report.shardIdxs[shardID].Open(); err != nil { + return err + } + + idx := report.shardIdxs[shardID] + itr, err := idx.MeasurementIterator() + if err != nil { + return err + } else if itr == nil { + return nil + } + defer itr.Close() + +OUTER: + for { + name, err := itr.Next() + if err != nil { + return err + } else if name == nil { + break OUTER + } + + // Get series ID set to track cardinality under measurement. + c, ok := report.cardinalities[shardID][string(name)] + if !ok { + c = &cardinality{name: name} + report.cardinalities[shardID][string(name)] = c + } + + sitr, err := idx.MeasurementSeriesIDIterator(name) + if err != nil { + return err + } else if sitr == nil { + continue + } + + var e tsdb.SeriesIDElem + for e, err = sitr.Next(); err == nil && e.SeriesID != 0; e, err = sitr.Next() { + if e.SeriesID > math.MaxUint32 { + return fmt.Errorf("series ID is too large: %d (max %d)", e.SeriesID, uint32(math.MaxUint32)) + } + c.add(e.SeriesID) + } + sitr.Close() + + if err != nil { + return err + } + } + return nil +} + +type result struct { + name []byte + count int64 + + // For low cardinality measurements just track series using map + lowCardinality map[uint32]struct{} + + // For higher cardinality measurements track using bitmap. + set *tsdb.SeriesIDSet +} + +func (r *result) addShort(ids []uint32) { + // There is already a bitset of this result. + if r.set != nil { + for _, id := range ids { + r.set.AddNoLock(uint64(id)) + } + return + } + + // Still tracking low cardinality sets + if r.lowCardinality == nil { + r.lowCardinality = map[uint32]struct{}{} + } + + for _, id := range ids { + r.lowCardinality[id] = struct{}{} + } + + // Cardinality is large enough that we will benefit from using a bitmap + if len(r.lowCardinality) > useBitmapN { + r.set = tsdb.NewSeriesIDSet() + for id := range r.lowCardinality { + r.set.AddNoLock(uint64(id)) + } + r.lowCardinality = nil + } +} + +func (r *result) merge(other *tsdb.SeriesIDSet) { + if r.set == nil { + r.set = tsdb.NewSeriesIDSet() + for id := range r.lowCardinality { + r.set.AddNoLock(uint64(id)) + } + r.lowCardinality = nil + } + r.set.Merge(other) +} + +type results []*result + +func (a results) Len() int { return len(a) } +func (a results) Less(i, j int) bool { return a[i].count < a[j].count } +func (a results) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func (report *reportTSI) printSummaryByMeasurement() error { + // Get global set of measurement names across shards. + idxs := &tsdb.IndexSet{SeriesFile: report.sfile} + for _, idx := range report.shardIdxs { + idxs.Indexes = append(idxs.Indexes, idx) + } + + mitr, err := idxs.MeasurementIterator() + if err != nil { + return err + } else if mitr == nil { + return errors.New("got nil measurement iterator for index set") + } + defer mitr.Close() + + var name []byte + var totalCardinality int64 + measurements := results{} + for name, err = mitr.Next(); err == nil && name != nil; name, err = mitr.Next() { + res := &result{name: name} + for _, shardCards := range report.cardinalities { + other, ok := shardCards[string(name)] + if !ok { + continue // this shard doesn't have anything for this measurement. + } + + if other.short != nil { // low cardinality case + res.addShort(other.short) + } else if other.set != nil { // High cardinality case + res.merge(other.set) + } + } + + // Determine final cardinality and allow intermediate structures to be + // GCd. + if res.lowCardinality != nil { + res.count = int64(len(res.lowCardinality)) + } else { + res.count = int64(res.set.Cardinality()) + } + totalCardinality += res.count + res.set = nil + res.lowCardinality = nil + measurements = append(measurements, res) + } + + if err != nil { + return err + } + + // sort measurements by cardinality. + sort.Sort(sort.Reverse(measurements)) + + if report.topN > 0 { + // There may not be "topN" measurement cardinality to sub-slice. + n := int(math.Min(float64(report.topN), float64(len(measurements)))) + measurements = measurements[:n] + } + + tw := tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', tabwriter.AlignRight) + + fmt.Fprintf(tw, "Summary\nDatabase Path: %s\nCardinality (exact): %d\n\n", report.enginePath, totalCardinality) + fmt.Fprint(tw, "Measurement\tCardinality (exact)\n\n") + for _, res := range measurements { + fmt.Fprintf(tw, "%q\t%d\t\n", res.name, res.count) + } + + if err := tw.Flush(); err != nil { + return err + } + fmt.Fprint(os.Stdout, "\n\n") + + return nil +} + +func (report *reportTSI) printShardByMeasurement(id uint64) error { + allMap, ok := report.cardinalities[id] + if !ok { + return nil + } + + var totalCardinality int64 + all := make(cardinalities, 0, len(allMap)) + for _, card := range allMap { + n := card.cardinality() + if n == 0 { + continue + } + + totalCardinality += n + all = append(all, card) + } + + sort.Sort(sort.Reverse(all)) + + // Trim to top-n + if report.topN > 0 { + // There may not be "topN" measurement cardinality to sub-slice. + n := int(math.Min(float64(report.topN), float64(len(all)))) + all = all[:n] + } + + tw := tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0) + fmt.Fprintf(tw, "===============\nShard ID: %d\nPath: %s\nCardinality (exact): %d\n\n", id, report.shardPaths[id], totalCardinality) + fmt.Fprint(tw, "Measurement\tCardinality (exact)\n\n") + for _, card := range all { + fmt.Fprintf(tw, "%q\t%d\t\n", card.name, card.cardinality()) + } + fmt.Fprint(tw, "===============\n\n") + if err := tw.Flush(); err != nil { + return err + } + + return nil +}