diff --git a/CHANGELOG.md b/CHANGELOG.md index 61b60d5d608..7b18b3cbf79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ This release adds an embedded SQLite database for storing metadata required by t 1. [21784](https://github.com/influxdata/influxdb/pull/21784): Ported the `influxd inspect dumptsi` command from 1.x. 1. [21786](https://github.com/influxdata/influxdb/pull/21786): Ported the `influxd inspect deletetsm` command from 1.x. 1. [21802](https://github.com/influxdata/influxdb/pull/21802): Removed unused `chronograf-migator` package & chronograf API service, and updated various "chronograf" references. +1. [21888](https://github.com/influxdata/influxdb/pull/21888/): Ported the `influxd inspect dump-wal` command from 1.x. 1. [21828](https://github.com/influxdata/influxdb/pull/21828): Added the command `influx inspect verify-wal`. 1. [21814](https://github.com/influxdata/influxdb/pull/21814): Ported the `influxd inspect report-tsm` command from 1.x. diff --git a/cmd/influxd/inspect/dump_wal/dump_wal.go b/cmd/influxd/inspect/dump_wal/dump_wal.go new file mode 100644 index 00000000000..a75556d5d26 --- /dev/null +++ b/cmd/influxd/inspect/dump_wal/dump_wal.go @@ -0,0 +1,164 @@ +package dump_wal + +import ( + "fmt" + "os" + "path/filepath" + "sort" + + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/spf13/cobra" +) + +type dumpWALCommand struct { + findDuplicates bool +} + +func NewDumpWALCommand() *cobra.Command { + var dumpWAL dumpWALCommand + cmd := &cobra.Command{ + Use: "dump-wal", + Short: "Dumps TSM data from WAL files", + Long: ` +This tool dumps data from WAL files for debugging purposes. Given at least one WAL file path as an argument, the tool will parse and print out the entries in each file. +It has two modes of operation, depending on the --find-duplicates flag. +--find-duplicates=false (default): for each file, the following is printed: + * The file name + * for each entry, + * The type of the entry (either [write] or [delete-bucket-range]); + * The formatted entry contents +--find-duplicates=true: for each file, the following is printed: + * The file name + * A list of keys in the file that have duplicate or out of order timestamps +`, + Args: cobra.MinimumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return dumpWAL.run(args) + }, + } + + cmd.Flags().BoolVarP(&dumpWAL.findDuplicates, "find-duplicates", "", false, + "ignore dumping entries; only report keys in the WAL files that are duplicates or out of order (default false)") + + return cmd +} + +func (dumpWAL *dumpWALCommand) run(args []string) error { + + // Process each WAL file. + for _, path := range args { + if err := dumpWAL.processWALFile(path); err != nil { + return err + } + } + return nil +} + +func (dumpWAL *dumpWALCommand) processWALFile(path string) error { + if filepath.Ext(path) != "."+tsm1.WALFileExtension { + fmt.Fprintf(os.Stderr, "invalid wal file path, skipping %s\n", path) + return nil + } + + // Track the earliest timestamp for each key and a set of keys with out-of-order points. + minTimestampByKey := make(map[string]int64) + duplicateKeys := make(map[string]struct{}) + + // Open WAL reader. + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + r := tsm1.NewWALSegmentReader(f) + defer r.Close() + + // Iterate over the WAL entries. + for r.Next() { + entry, err := r.Read() + if err != nil { + return fmt.Errorf("failed to read entry from %q: %w", path, err) + } + + switch entry := entry.(type) { + case *tsm1.WriteWALEntry: + if !dumpWAL.findDuplicates { + fmt.Printf("[write] sz=%d\n", entry.MarshalSize()) + } + + keys := make([]string, 0, len(entry.Values)) + for k := range entry.Values { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, k := range keys { + for _, v := range entry.Values[k] { + t := v.UnixNano() + + if dumpWAL.findDuplicates { + // Check for duplicate/out of order keys. + if min, ok := minTimestampByKey[k]; ok && t <= min { + duplicateKeys[k] = struct{}{} + } + minTimestampByKey[k] = t + + // Skip printing if we are only showing duplicate keys. + continue + } + + switch v := v.(type) { + case tsm1.IntegerValue: + fmt.Printf("%s %vi %d\n", k, v.Value(), t) + case tsm1.UnsignedValue: + fmt.Printf("%s %vu %d\n", k, v.Value(), t) + case tsm1.FloatValue: + fmt.Printf("%s %v %d\n", k, v.Value(), t) + case tsm1.BooleanValue: + fmt.Printf("%s %v %d\n", k, v.Value(), t) + case tsm1.StringValue: + fmt.Printf("%s %q %d\n", k, v.Value(), t) + default: + fmt.Printf("%s EMPTY\n", k) + } + } + } + + case *tsm1.DeleteWALEntry: + fmt.Printf("[delete] sz=%d\n", entry.MarshalSize()) + for _, k := range entry.Keys { + fmt.Printf("%s\n", string(k)) + } + + case *tsm1.DeleteRangeWALEntry: + fmt.Printf("[delete-range] min=%d max=%d sz=%d\n", entry.Min, entry.Max, entry.MarshalSize()) + for _, k := range entry.Keys { + fmt.Printf("%s\n", string(k)) + } + + default: + return fmt.Errorf("invalid wal entry: %#v", entry) + } + } + + // Print keys with duplicate or out-of-order points, if requested. + if dumpWAL.findDuplicates { + keys := make([]string, 0, len(duplicateKeys)) + + if len(duplicateKeys) == 0 { + fmt.Println("No duplicates or out of order timestamps found") + return nil + } + + for k := range duplicateKeys { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, k := range keys { + fmt.Println(k) + } + } + + return nil +} diff --git a/cmd/influxd/inspect/inspect.go b/cmd/influxd/inspect/inspect.go index 43fe941ca90..c7995b3317a 100644 --- a/cmd/influxd/inspect/inspect.go +++ b/cmd/influxd/inspect/inspect.go @@ -4,6 +4,7 @@ import ( "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/delete_tsm" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/dump_tsi" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/dump_tsm" + "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/dump_wal" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_index" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_lp" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/report_tsi" @@ -40,6 +41,7 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) { base.AddCommand(dump_tsm.NewDumpTSMCommand()) base.AddCommand(dump_tsi.NewDumpTSICommand()) base.AddCommand(delete_tsm.NewDeleteTSMCommand()) + base.AddCommand(dump_wal.NewDumpWALCommand()) base.AddCommand(verify_wal.NewVerifyWALCommand()) base.AddCommand(report_tsm.NewReportTSMCommand())