diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ec5167ba9f..78bd1ab3dba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ This release adds an embedded SQLite database for storing metadata required by t 1. [21784](https://github.com/influxdata/influxdb/pull/21784): Ported the `influxd inspect dumptsi` command from 1.x. 1. [21786](https://github.com/influxdata/influxdb/pull/21786): Ported the `influxd inspect deletetsm` command from 1.x. 1. [21802](https://github.com/influxdata/influxdb/pull/21802): Removed unused `chronograf-migator` package & chronograf API service, and updated various "chronograf" references. +1. [21828](https://github.com/influxdata/influxdb/pull/21828): Added the command `influx inspect verify-wal`. ### Bug Fixes diff --git a/cmd/influxd/inspect/inspect.go b/cmd/influxd/inspect/inspect.go index 89fc4d27b9a..1ec61600300 100644 --- a/cmd/influxd/inspect/inspect.go +++ b/cmd/influxd/inspect/inspect.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_seriesfile" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tombstone" "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tsm" + "github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_wal" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -36,6 +37,7 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) { base.AddCommand(dump_tsm.NewDumpTSMCommand()) base.AddCommand(dump_tsi.NewDumpTSICommand()) base.AddCommand(delete_tsm.NewDeleteTSMCommand()) + base.AddCommand(verify_wal.NewVerifyWALCommand()) return base, nil } diff --git a/cmd/influxd/inspect/verify_wal/verify_wal.go b/cmd/influxd/inspect/verify_wal/verify_wal.go new file mode 100644 index 00000000000..02b8a5c78cc --- /dev/null +++ b/cmd/influxd/inspect/verify_wal/verify_wal.go @@ -0,0 +1,144 @@ +package verify_wal + +import ( + "fmt" + "os" + "path/filepath" + "text/tabwriter" + "time" + + "github.com/influxdata/influxdb/v2/internal/fs" + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/spf13/cobra" +) + +type args struct { + dir string + verbose bool +} + +func NewVerifyWALCommand() *cobra.Command { + var arguments args + cmd := &cobra.Command{ + Use: `verify-wal`, + Short: "Check for WAL corruption", + Long: ` +This command will analyze the WAL (Write-Ahead Log) in a storage directory to +check if there are any corrupt files. If any corrupt files are found, the names +of said corrupt files will be reported. The tool will also count the total number +of entries in the scanned WAL files, in case this is of interest. +For each file, the following is output: + * The file name; + * "clean" (if the file is clean) OR + The first position of any corruption that is found +In the summary section, the following is printed: + * The number of WAL files scanned; + * The number of WAL entries scanned; + * A list of files found to be corrupt`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + return arguments.Run(cmd) + }, + } + + dir, err := fs.InfluxDir() + if err != nil { + panic(err) + } + dir = filepath.Join(dir, "engine/wal") + cmd.Flags().StringVar(&arguments.dir, "wal-dir", dir, "use provided WAL directory.") + cmd.Flags().BoolVarP(&arguments.verbose, "verbose", "v", false, "enable verbose logging") + return cmd +} + +func (a args) Run(cmd *cobra.Command) error { + // Verify valid directory + fi, err := os.Stat(a.dir) + if err != nil { + return fmt.Errorf("failed to stat %q: %w", a.dir, err) + } else if !fi.IsDir() { + return fmt.Errorf("%q is not a directory", a.dir) + } + + // Find all WAL files in provided directory + files, err := loadFiles(a.dir) + if err != nil { + return fmt.Errorf("failed to search for WAL files in directory %s: %w", a.dir, err) + } + if len(files) == 0 { + return fmt.Errorf("no WAL files found in directory %s", a.dir) + } + + start := time.Now() + tw := tabwriter.NewWriter(cmd.OutOrStdout(), 8, 2, 1, ' ', 0) + + var corruptFiles []string + var totalEntriesScanned int + + // Scan each WAL file + for _, fpath := range files { + var entriesScanned int + f, err := os.OpenFile(fpath, os.O_RDONLY, 0600) + if err != nil { + return fmt.Errorf("error opening file %s: %w. Exiting", fpath, err) + } + + clean := true + reader := tsm1.NewWALSegmentReader(f) + + // Check for corrupted entries + for reader.Next() { + entriesScanned++ + _, err := reader.Read() + if err != nil { + clean = false + _, _ = fmt.Fprintf(cmd.ErrOrStderr(), "%s: corrupt entry found at position %d\n", fpath, reader.Count()) + corruptFiles = append(corruptFiles, fpath) + break + } + } + + if a.verbose { + if entriesScanned == 0 { + // No data found in file + _, _ = fmt.Fprintf(cmd.ErrOrStderr(), "%s: no WAL entries found\n", f.Name()) + } else if clean { + // No corrupted entry found + _, _ = fmt.Fprintf(cmd.ErrOrStderr(), "%s: clean\n", fpath) + } + } + totalEntriesScanned += entriesScanned + _ = tw.Flush() + } + + // Print Summary + _, _ = fmt.Fprintf(tw, "Results:\n") + _, _ = fmt.Fprintf(tw, " Files checked: %d\n", len(files)) + _, _ = fmt.Fprintf(tw, " Total entries checked: %d\n", totalEntriesScanned) + _, _ = fmt.Fprintf(tw, " Corrupt files found: ") + if len(corruptFiles) == 0 { + _, _ = fmt.Fprintf(tw, "None") + } else { + for _, name := range corruptFiles { + _, _ = fmt.Fprintf(tw, "\n %s", name) + } + } + + _, _ = fmt.Fprintf(tw, "\nCompleted in %v\n", time.Since(start)) + _ = tw.Flush() + + return nil +} + +func loadFiles(dir string) (files []string, err error) { + err = filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if filepath.Ext(path) == "."+tsm1.WALFileExtension { + files = append(files, path) + } + return nil + }) + return +} diff --git a/cmd/influxd/inspect/verify_wal/verify_wal_test.go b/cmd/influxd/inspect/verify_wal/verify_wal_test.go new file mode 100644 index 00000000000..dd9e257b3dc --- /dev/null +++ b/cmd/influxd/inspect/verify_wal/verify_wal_test.go @@ -0,0 +1,157 @@ +package verify_wal + +import ( + "bytes" + "context" + "io" + "os" + "testing" + + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/stretchr/testify/require" +) + +type testInfo struct { + t *testing.T + path string + expectedOut string + expectErr bool + withStdErr bool +} + +func TestVerifies_InvalidFileType(t *testing.T) { + path, err := os.MkdirTemp("", "verify-wal") + require.NoError(t, err) + + _, err = os.CreateTemp(path, "verifywaltest*"+".txt") + require.NoError(t, err) + defer os.RemoveAll(path) + + runCommand(testInfo{ + t: t, + path: path, + expectedOut: "no WAL files found in directory", + expectErr: true, + }) +} + +func TestVerifies_InvalidNotDir(t *testing.T) { + path, file := newTempWALInvalid(t, true) + defer os.RemoveAll(path) + + runCommand(testInfo{ + t: t, + path: file.Name(), + expectedOut: "is not a directory", + expectErr: true, + }) +} + +func TestVerifies_InvalidEmptyFile(t *testing.T) { + path, _ := newTempWALInvalid(t, true) + defer os.RemoveAll(path) + + runCommand(testInfo{ + t: t, + path: path, + expectedOut: "no WAL entries found", + withStdErr: true, + }) +} + +func TestVerifies_Invalid(t *testing.T) { + path, _ := newTempWALInvalid(t, false) + defer os.RemoveAll(path) + + runCommand(testInfo{ + t: t, + path: path, + expectedOut: "corrupt entry found at position", + withStdErr: true, + }) +} + +func TestVerifies_Valid(t *testing.T) { + path := newTempWALValid(t) + defer os.RemoveAll(path) + + runCommand(testInfo{ + t: t, + path: path, + expectedOut: "clean", + withStdErr: true, + }) +} + +func runCommand(args testInfo) { + verify := NewVerifyWALCommand() + verify.SetArgs([]string{"--wal-dir", args.path, "--verbose"}) + + b := bytes.NewBufferString("") + verify.SetOut(b) + if args.withStdErr { + verify.SetErr(b) + } + + if args.expectErr { + require.Error(args.t, verify.Execute()) + } else { + require.NoError(args.t, verify.Execute()) + } + + out, err := io.ReadAll(b) + require.NoError(args.t, err) + require.Contains(args.t, string(out), args.expectedOut) +} + +func newTempWALValid(t *testing.T) string { + t.Helper() + + dir, err := os.MkdirTemp("", "verify-wal") + require.NoError(t, err) + + w := tsm1.NewWAL(dir, 0, 0) + defer w.Close() + require.NoError(t, w.Open()) + + p1 := tsm1.NewValue(1, 1.1) + p2 := tsm1.NewValue(1, int64(1)) + p3 := tsm1.NewValue(1, true) + p4 := tsm1.NewValue(1, "string") + p5 := tsm1.NewValue(1, ^uint64(0)) + + values := map[string][]tsm1.Value{ + "cpu,host=A#!~#float": {p1}, + "cpu,host=A#!~#int": {p2}, + "cpu,host=A#!~#bool": {p3}, + "cpu,host=A#!~#string": {p4}, + "cpu,host=A#!~#unsigned": {p5}, + } + + _, err = w.WriteMulti(context.Background(), values) + require.NoError(t, err) + + return dir +} + +func newTempWALInvalid(t *testing.T, empty bool) (string, *os.File) { + t.Helper() + + dir, err := os.MkdirTemp("", "verify-wal") + require.NoError(t, err) + + file, err := os.CreateTemp(dir, "verifywaltest*."+tsm1.WALFileExtension) + require.NoError(t, err) + + if !empty { + writer, err := os.OpenFile(file.Name(), os.O_APPEND|os.O_WRONLY, 0644) + require.NoError(t, err) + defer writer.Close() + + written, err := writer.Write([]byte("foobar")) + require.NoError(t, err) + require.Equal(t, 6, written) + } + + return dir, file +}