Skip to content

Commit

Permalink
feat: create influxd inspect verify-wal command (influxdata#21828)
Browse files Browse the repository at this point in the history
  • Loading branch information
serenibyss authored Jul 15, 2021
1 parent fa1c352 commit 31780ba
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ This release adds an embedded SQLite database for storing metadata required by t
1. [21784](https://github.com/influxdata/influxdb/pull/21784): Ported the `influxd inspect dumptsi` command from 1.x.
1. [21786](https://github.com/influxdata/influxdb/pull/21786): Ported the `influxd inspect deletetsm` command from 1.x.
1. [21802](https://github.com/influxdata/influxdb/pull/21802): Removed unused `chronograf-migator` package & chronograf API service, and updated various "chronograf" references.
1. [21828](https://github.com/influxdata/influxdb/pull/21828): Added the command `influx inspect verify-wal`.

### Bug Fixes

Expand Down
2 changes: 2 additions & 0 deletions cmd/influxd/inspect/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_seriesfile"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tombstone"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_tsm"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_wal"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -36,6 +37,7 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) {
base.AddCommand(dump_tsm.NewDumpTSMCommand())
base.AddCommand(dump_tsi.NewDumpTSICommand())
base.AddCommand(delete_tsm.NewDeleteTSMCommand())
base.AddCommand(verify_wal.NewVerifyWALCommand())

return base, nil
}
144 changes: 144 additions & 0 deletions cmd/influxd/inspect/verify_wal/verify_wal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package verify_wal

import (
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"

"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/spf13/cobra"
)

type args struct {
dir string
verbose bool
}

func NewVerifyWALCommand() *cobra.Command {
var arguments args
cmd := &cobra.Command{
Use: `verify-wal`,
Short: "Check for WAL corruption",
Long: `
This command will analyze the WAL (Write-Ahead Log) in a storage directory to
check if there are any corrupt files. If any corrupt files are found, the names
of said corrupt files will be reported. The tool will also count the total number
of entries in the scanned WAL files, in case this is of interest.
For each file, the following is output:
* The file name;
* "clean" (if the file is clean) OR
The first position of any corruption that is found
In the summary section, the following is printed:
* The number of WAL files scanned;
* The number of WAL entries scanned;
* A list of files found to be corrupt`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return arguments.Run(cmd)
},
}

dir, err := fs.InfluxDir()
if err != nil {
panic(err)
}
dir = filepath.Join(dir, "engine/wal")
cmd.Flags().StringVar(&arguments.dir, "wal-dir", dir, "use provided WAL directory.")
cmd.Flags().BoolVarP(&arguments.verbose, "verbose", "v", false, "enable verbose logging")
return cmd
}

func (a args) Run(cmd *cobra.Command) error {
// Verify valid directory
fi, err := os.Stat(a.dir)
if err != nil {
return fmt.Errorf("failed to stat %q: %w", a.dir, err)
} else if !fi.IsDir() {
return fmt.Errorf("%q is not a directory", a.dir)
}

// Find all WAL files in provided directory
files, err := loadFiles(a.dir)
if err != nil {
return fmt.Errorf("failed to search for WAL files in directory %s: %w", a.dir, err)
}
if len(files) == 0 {
return fmt.Errorf("no WAL files found in directory %s", a.dir)
}

start := time.Now()
tw := tabwriter.NewWriter(cmd.OutOrStdout(), 8, 2, 1, ' ', 0)

var corruptFiles []string
var totalEntriesScanned int

// Scan each WAL file
for _, fpath := range files {
var entriesScanned int
f, err := os.OpenFile(fpath, os.O_RDONLY, 0600)
if err != nil {
return fmt.Errorf("error opening file %s: %w. Exiting", fpath, err)
}

clean := true
reader := tsm1.NewWALSegmentReader(f)

// Check for corrupted entries
for reader.Next() {
entriesScanned++
_, err := reader.Read()
if err != nil {
clean = false
_, _ = fmt.Fprintf(cmd.ErrOrStderr(), "%s: corrupt entry found at position %d\n", fpath, reader.Count())
corruptFiles = append(corruptFiles, fpath)
break
}
}

if a.verbose {
if entriesScanned == 0 {
// No data found in file
_, _ = fmt.Fprintf(cmd.ErrOrStderr(), "%s: no WAL entries found\n", f.Name())
} else if clean {
// No corrupted entry found
_, _ = fmt.Fprintf(cmd.ErrOrStderr(), "%s: clean\n", fpath)
}
}
totalEntriesScanned += entriesScanned
_ = tw.Flush()
}

// Print Summary
_, _ = fmt.Fprintf(tw, "Results:\n")
_, _ = fmt.Fprintf(tw, " Files checked: %d\n", len(files))
_, _ = fmt.Fprintf(tw, " Total entries checked: %d\n", totalEntriesScanned)
_, _ = fmt.Fprintf(tw, " Corrupt files found: ")
if len(corruptFiles) == 0 {
_, _ = fmt.Fprintf(tw, "None")
} else {
for _, name := range corruptFiles {
_, _ = fmt.Fprintf(tw, "\n %s", name)
}
}

_, _ = fmt.Fprintf(tw, "\nCompleted in %v\n", time.Since(start))
_ = tw.Flush()

return nil
}

func loadFiles(dir string) (files []string, err error) {
err = filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
if filepath.Ext(path) == "."+tsm1.WALFileExtension {
files = append(files, path)
}
return nil
})
return
}
157 changes: 157 additions & 0 deletions cmd/influxd/inspect/verify_wal/verify_wal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package verify_wal

import (
"bytes"
"context"
"io"
"os"
"testing"

"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/require"
)

type testInfo struct {
t *testing.T
path string
expectedOut string
expectErr bool
withStdErr bool
}

func TestVerifies_InvalidFileType(t *testing.T) {
path, err := os.MkdirTemp("", "verify-wal")
require.NoError(t, err)

_, err = os.CreateTemp(path, "verifywaltest*"+".txt")
require.NoError(t, err)
defer os.RemoveAll(path)

runCommand(testInfo{
t: t,
path: path,
expectedOut: "no WAL files found in directory",
expectErr: true,
})
}

func TestVerifies_InvalidNotDir(t *testing.T) {
path, file := newTempWALInvalid(t, true)
defer os.RemoveAll(path)

runCommand(testInfo{
t: t,
path: file.Name(),
expectedOut: "is not a directory",
expectErr: true,
})
}

func TestVerifies_InvalidEmptyFile(t *testing.T) {
path, _ := newTempWALInvalid(t, true)
defer os.RemoveAll(path)

runCommand(testInfo{
t: t,
path: path,
expectedOut: "no WAL entries found",
withStdErr: true,
})
}

func TestVerifies_Invalid(t *testing.T) {
path, _ := newTempWALInvalid(t, false)
defer os.RemoveAll(path)

runCommand(testInfo{
t: t,
path: path,
expectedOut: "corrupt entry found at position",
withStdErr: true,
})
}

func TestVerifies_Valid(t *testing.T) {
path := newTempWALValid(t)
defer os.RemoveAll(path)

runCommand(testInfo{
t: t,
path: path,
expectedOut: "clean",
withStdErr: true,
})
}

func runCommand(args testInfo) {
verify := NewVerifyWALCommand()
verify.SetArgs([]string{"--wal-dir", args.path, "--verbose"})

b := bytes.NewBufferString("")
verify.SetOut(b)
if args.withStdErr {
verify.SetErr(b)
}

if args.expectErr {
require.Error(args.t, verify.Execute())
} else {
require.NoError(args.t, verify.Execute())
}

out, err := io.ReadAll(b)
require.NoError(args.t, err)
require.Contains(args.t, string(out), args.expectedOut)
}

func newTempWALValid(t *testing.T) string {
t.Helper()

dir, err := os.MkdirTemp("", "verify-wal")
require.NoError(t, err)

w := tsm1.NewWAL(dir, 0, 0)
defer w.Close()
require.NoError(t, w.Open())

p1 := tsm1.NewValue(1, 1.1)
p2 := tsm1.NewValue(1, int64(1))
p3 := tsm1.NewValue(1, true)
p4 := tsm1.NewValue(1, "string")
p5 := tsm1.NewValue(1, ^uint64(0))

values := map[string][]tsm1.Value{
"cpu,host=A#!~#float": {p1},
"cpu,host=A#!~#int": {p2},
"cpu,host=A#!~#bool": {p3},
"cpu,host=A#!~#string": {p4},
"cpu,host=A#!~#unsigned": {p5},
}

_, err = w.WriteMulti(context.Background(), values)
require.NoError(t, err)

return dir
}

func newTempWALInvalid(t *testing.T, empty bool) (string, *os.File) {
t.Helper()

dir, err := os.MkdirTemp("", "verify-wal")
require.NoError(t, err)

file, err := os.CreateTemp(dir, "verifywaltest*."+tsm1.WALFileExtension)
require.NoError(t, err)

if !empty {
writer, err := os.OpenFile(file.Name(), os.O_APPEND|os.O_WRONLY, 0644)
require.NoError(t, err)
defer writer.Close()

written, err := writer.Write([]byte("foobar"))
require.NoError(t, err)
require.Equal(t, 6, written)
}

return dir, file
}

0 comments on commit 31780ba

Please sign in to comment.