diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 36c2923cb39..998d4a39999 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -15,6 +15,7 @@ package etcdserver import ( + "io" "log" "os" "path" @@ -52,15 +53,15 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage { // SaveSnap saves the snapshot to disk and release the locked // wal files since they will not be used. func (st *storage) SaveSnap(snap raftpb.Snapshot) error { - err := st.Snapshotter.SaveSnap(snap) - if err != nil { - return err - } walsnap := walpb.Snapshot{ Index: snap.Metadata.Index, Term: snap.Metadata.Term, } - err = st.WAL.SaveSnapshot(walsnap) + err := st.WAL.SaveSnapshot(walsnap) + if err != nil { + return err + } + err = st.Snapshotter.SaveSnap(snap) if err != nil { return err } @@ -72,13 +73,31 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { } func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { - var err error - if w, err = wal.Open(waldir, snap); err != nil { - log.Fatalf("etcdserver: open wal error: %v", err) - } - var wmetadata []byte - if wmetadata, st, ents, err = w.ReadAll(); err != nil { - log.Fatalf("etcdserver: read wal error: %v", err) + var ( + err error + wmetadata []byte + ) + + repaired := false + for { + if w, err = wal.Open(waldir, snap); err != nil { + log.Fatalf("etcdserver: open wal error: %v", err) + } + if wmetadata, st, ents, err = w.ReadAll(); err != nil { + w.Close() + // we can only repair ErrUnexpectedEOF and we never repair twice. + if repaired || err != io.ErrUnexpectedEOF { + log.Fatalf("etcdserver: read wal error (%v) and cannot be repaired", err) + } + if !wal.Repair(waldir) { + log.Fatalf("etcdserver: WAL error (%v) cannot be repaired", err) + } else { + log.Printf("etcdserver: repaired WAL error (%v)", err) + repaired = true + } + continue + } + break } var metadata pb.Metadata pbutil.MustUnmarshal(&metadata, wmetadata) diff --git a/wal/repair.go b/wal/repair.go new file mode 100644 index 00000000000..979df647bf2 --- /dev/null +++ b/wal/repair.go @@ -0,0 +1,107 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "io" + "log" + "os" + "path" + + "github.com/coreos/etcd/pkg/fileutil" + "github.com/coreos/etcd/wal/walpb" +) + +// Repair tries to repair the unexpectedEOF error in the +// last wal file by truncating. +func Repair(dirpath string) bool { + f, err := openLast(dirpath) + if err != nil { + return false + } + defer f.Close() + + n := 0 + rec := &walpb.Record{} + + decoder := newDecoder(f) + defer decoder.close() + for { + err := decoder.decode(rec) + switch err { + case nil: + n += 8 + rec.Size() + // update crc of the decoder when necessary + switch rec.Type { + case crcType: + crc := decoder.crc.Sum32() + // current crc of decoder must match the crc of the record. + // do no need to match 0 crc, since the decoder is a new one at this case. + if crc != 0 && rec.Validate(crc) != nil { + return false + } + decoder.updateCRC(rec.Crc) + } + continue + case io.EOF: + return true + case io.ErrUnexpectedEOF: + log.Printf("wal: repairing %v", f.Name()) + bf, bferr := os.Create(f.Name() + ".broken") + if bferr != nil { + log.Printf("wal: could not repair %v, failed to create backup file", f.Name()) + return false + } + defer bf.Close() + + if _, err = f.Seek(0, os.SEEK_SET); err != nil { + log.Printf("wal: could not repair %v, failed to read file", f.Name()) + return false + } + + if _, err = io.Copy(bf, f); err != nil { + log.Printf("wal: could not repair %v, failed to copy file", f.Name()) + return false + } + + if err = f.Truncate(int64(n)); err != nil { + log.Printf("wal: could not repair %v, failed to truncate file", f.Name()) + return false + } + if err = f.Sync(); err != nil { + log.Printf("wal: could not repair %v, failed to sync file", f.Name()) + return false + } + return true + default: + log.Printf("wal: could not repair error (%v)", err) + return false + } + } +} + +// openLast opens the last wal file for read and write. +func openLast(dirpath string) (*os.File, error) { + names, err := fileutil.ReadDir(dirpath) + if err != nil { + return nil, err + } + names = checkWalNames(names) + if len(names) == 0 { + return nil, ErrFileNotFound + } + last := path.Join(dirpath, names[len(names)-1]) + return os.OpenFile(last, os.O_RDWR, 0) +} diff --git a/wal/repair_test.go b/wal/repair_test.go new file mode 100644 index 00000000000..38c03b0c9ff --- /dev/null +++ b/wal/repair_test.go @@ -0,0 +1,91 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "io" + "io/ioutil" + "os" + "testing" + + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal/walpb" +) + +func TestRepair(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + // create WAL + w, err := Create(p, nil) + defer w.Close() + if err != nil { + t.Fatal(err) + } + + n := 10 + for i := 1; i <= n; i++ { + es := []raftpb.Entry{{Index: uint64(i)}} + if err = w.Save(raftpb.HardState{}, es); err != nil { + t.Fatal(err) + } + } + w.Close() + + // break the wal. + f, err := openLast(p) + if err != nil { + t.Fatal(err) + } + offset, err := f.Seek(-4, os.SEEK_END) + if err != nil { + t.Fatal(err) + } + err = f.Truncate(offset) + if err != nil { + t.Fatal(err) + } + + // verify we have broke the wal + w, err = Open(p, walpb.Snapshot{}) + if err != nil { + t.Fatal(err) + } + _, _, _, err = w.ReadAll() + if err != io.ErrUnexpectedEOF { + t.Fatalf("err = %v, want %v", err, io.ErrUnexpectedEOF) + } + w.Close() + + // repair the wal + ok := Repair(p) + if !ok { + t.Fatalf("fix = %t, want %t", ok, true) + } + + w, err = Open(p, walpb.Snapshot{}) + if err != nil { + t.Fatal(err) + } + _, _, ents, err := w.ReadAll() + if err != nil { + t.Fatalf("err = %v, want %v", err, nil) + } + if len(ents) != n-1 { + t.Fatalf("len(ents) = %d, want %d", len(ents), n-1) + } +} diff --git a/wal/util.go b/wal/util.go index c80fbdef237..b8b292fe797 100644 --- a/wal/util.go +++ b/wal/util.go @@ -15,12 +15,18 @@ package wal import ( + "errors" "fmt" "log" + "strings" "github.com/coreos/etcd/pkg/fileutil" ) +var ( + badWalName = errors.New("bad wal name") +) + func Exist(dirpath string) bool { names, err := fileutil.ReadDir(dirpath) if err != nil { @@ -76,8 +82,11 @@ func checkWalNames(names []string) []string { } func parseWalName(str string) (seq, index uint64, err error) { + if !strings.HasSuffix(str, ".wal") { + return 0, 0, badWalName + } _, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index) - return + return seq, index, err } func walName(seq, index uint64) string { diff --git a/wal/wal.go b/wal/wal.go index ecc809446e2..d242647eec0 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -414,14 +414,14 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { } // TODO(xiangli): no more reference operator - if err := w.saveState(&st); err != nil { - return err - } for i := range ents { if err := w.saveEntry(&ents[i]); err != nil { return err } } + if err := w.saveState(&st); err != nil { + return err + } fstat, err := w.f.Stat() if err != nil {