From 3d7539587557dfce8638c826e246f26c57d80d28 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 6 Mar 2017 14:08:46 -0800 Subject: [PATCH] *: remove never-unused vars, minor lint fix Signed-off-by: Gyu-Ho Lee --- auth/store_test.go | 3 +- etcdctl/ctlv2/command/role_commands.go | 4 +- etcdctl/ctlv3/command/printer_protobuf.go | 2 +- etcdserver/raft.go | 5 +- etcdserver/raft_test.go | 1 + etcdserver/server.go | 18 +++-- etcdserver/server_test.go | 91 ++++++++++++++--------- integration/cluster.go | 2 +- integration/v3_lease_test.go | 2 +- lease/lessor_test.go | 3 +- mvcc/kvstore_bench_test.go | 1 - pkg/transport/keepalive_listener_test.go | 4 + raft/node_test.go | 1 + rafthttp/stream.go | 5 +- store/store_test.go | 4 + store/watcher_hub.go | 2 +- wal/repair_test.go | 6 +- wal/wal_test.go | 12 ++- 18 files changed, 107 insertions(+), 59 deletions(-) diff --git a/auth/store_test.go b/auth/store_test.go index 7116347b893..64d864b9a34 100644 --- a/auth/store_test.go +++ b/auth/store_test.go @@ -345,7 +345,7 @@ func TestRoleRevokePermission(t *testing.T) { t.Fatal(err) } - r, err := as.RoleGet(&pb.AuthRoleGetRequest{Role: "role-test-1"}) + _, err = as.RoleGet(&pb.AuthRoleGetRequest{Role: "role-test-1"}) if err != nil { t.Fatal(err) } @@ -359,6 +359,7 @@ func TestRoleRevokePermission(t *testing.T) { t.Fatal(err) } + var r *pb.AuthRoleGetResponse r, err = as.RoleGet(&pb.AuthRoleGetRequest{Role: "role-test-1"}) if err != nil { t.Fatal(err) diff --git a/etcdctl/ctlv2/command/role_commands.go b/etcdctl/ctlv2/command/role_commands.go index 65c68eb596f..838b040623e 100644 --- a/etcdctl/ctlv2/command/role_commands.go +++ b/etcdctl/ctlv2/command/role_commands.go @@ -117,13 +117,13 @@ func actionRoleAdd(c *cli.Context) error { api, role := mustRoleAPIAndName(c) ctx, cancel := contextWithTotalTimeout(c) defer cancel() - currentRole, err := api.GetRole(ctx, role) + currentRole, _ := api.GetRole(ctx, role) if currentRole != nil { fmt.Fprintf(os.Stderr, "Role %s already exists\n", role) os.Exit(1) } - err = api.AddRole(ctx, role) + err := api.AddRole(ctx, role) if err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) diff --git a/etcdctl/ctlv3/command/printer_protobuf.go b/etcdctl/ctlv3/command/printer_protobuf.go index d9acd458212..c5109c5c2ee 100644 --- a/etcdctl/ctlv3/command/printer_protobuf.go +++ b/etcdctl/ctlv3/command/printer_protobuf.go @@ -60,5 +60,5 @@ func printPB(v interface{}) { fmt.Fprintf(os.Stderr, "%v\n", err) return } - fmt.Printf(string(b)) + fmt.Print(string(b)) } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index d7ec176eb3a..643caa455c9 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -113,7 +113,7 @@ type raftNode struct { readStateC chan raft.ReadState // utility - ticker <-chan time.Time + ticker *time.Ticker // contention detectors for raft heartbeat message td *contention.TimeoutDetector heartbeat time.Duration // for logging @@ -143,7 +143,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { for { select { - case <-r.ticker: + case <-r.ticker.C: r.Tick() case rd := <-r.Ready(): if rd.SoftState != nil { @@ -303,6 +303,7 @@ func (r *raftNode) stop() { func (r *raftNode) onStop() { r.Stop() + r.ticker.Stop() r.transport.Stop() if err := r.storage.Close(); err != nil { plog.Panicf("raft close storage error: %v", err) diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index dcd8c9ada2a..eb51c143411 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -158,6 +158,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { storage: mockstorage.NewStorageRecorder(""), raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), + ticker: &time.Ticker{}, }} srv.r.start(nil) n.readyc <- raft.Ready{} diff --git a/etcdserver/server.go b/etcdserver/server.go index 76f5ff6162e..a6432bd210e 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -220,7 +220,7 @@ type EtcdServer struct { stats *stats.ServerStats lstats *stats.LeaderStats - SyncTicker <-chan time.Time + SyncTicker *time.Ticker // compactor is used to auto-compact the KV. compactor *compactor.Periodic @@ -416,7 +416,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { r: raftNode{ isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, Node: n, - ticker: time.Tick(heartbeat), + ticker: time.NewTicker(heartbeat), // set up contention detectors for raft heartbeat message. // expect to send a heartbeat within 2 heartbeat intervals. td: contention.NewTimeoutDetector(2 * heartbeat), @@ -431,7 +431,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { cluster: cl, stats: sstats, lstats: lstats, - SyncTicker: time.Tick(500 * time.Millisecond), + SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: prt, reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), forceVersionC: make(chan struct{}), @@ -606,7 +606,7 @@ type raftReadyHandler struct { } func (s *EtcdServer) run() { - snap, err := s.r.raftStorage.Snapshot() + sn, err := s.r.raftStorage.Snapshot() if err != nil { plog.Panicf("get snapshot from raft storage error: %v", err) } @@ -637,7 +637,7 @@ func (s *EtcdServer) run() { } setSyncC(nil) } else { - setSyncC(s.SyncTicker) + setSyncC(s.SyncTicker.C) if s.compactor != nil { s.compactor.Resume() } @@ -664,9 +664,9 @@ func (s *EtcdServer) run() { // asynchronously accept apply packets, dispatch progress in-order sched := schedule.NewFIFOScheduler() ep := etcdProgress{ - confState: snap.Metadata.ConfState, - snapi: snap.Metadata.Index, - appliedi: snap.Metadata.Index, + confState: sn.Metadata.ConfState, + snapi: sn.Metadata.Index, + appliedi: sn.Metadata.Index, } defer func() { @@ -679,6 +679,8 @@ func (s *EtcdServer) run() { // wait for gouroutines before closing raft so wal stays open s.wg.Wait() + s.SyncTicker.Stop() + // must stop raft after scheduler-- etcdserver can leak rafthttp pipelines // by adding a peer after raft stops the transport s.r.stop() diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 5c794441855..af79517e0d4 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -173,11 +173,13 @@ func TestApplyRepeat(t *testing.T) { raftStorage: raft.NewMemoryStorage(), storage: mockstorage.NewStorageRecorder(""), transport: rafthttp.NewNopTransporter(), + ticker: &time.Ticker{}, }, - Cfg: &ServerConfig{}, - store: st, - cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + Cfg: &ServerConfig{}, + store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, } s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster} s.start() @@ -635,9 +637,11 @@ func TestDoProposal(t *testing.T) { storage: mockstorage.NewStorageRecorder(""), raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), + ticker: &time.Ticker{}, }, - store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + store: st, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, } srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} srv.start() @@ -788,6 +792,7 @@ func TestSyncTimeout(t *testing.T) { func TestSyncTrigger(t *testing.T) { n := newReadyNode() st := make(chan time.Time, 1) + tk := &time.Ticker{C: st} srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, r: raftNode{ @@ -795,9 +800,10 @@ func TestSyncTrigger(t *testing.T) { raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), storage: mockstorage.NewStorageRecorder(""), + ticker: &time.Ticker{}, }, store: mockstore.NewNop(), - SyncTicker: st, + SyncTicker: tk, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -910,9 +916,11 @@ func TestTriggerSnap(t *testing.T) { raftStorage: raft.NewMemoryStorage(), storage: p, transport: rafthttp.NewNopTransporter(), + ticker: &time.Ticker{}, }, - store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + store: st, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, } srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} @@ -979,9 +987,11 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { storage: mockstorage.NewStorageRecorder(testdir), raftStorage: rs, msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), + ticker: &time.Ticker{}, }, - store: st, - cluster: cl, + store: st, + cluster: cl, + SyncTicker: &time.Ticker{}, } s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster} @@ -1059,11 +1069,13 @@ func TestAddMember(t *testing.T) { raftStorage: raft.NewMemoryStorage(), storage: mockstorage.NewStorageRecorder(""), transport: rafthttp.NewNopTransporter(), + ticker: &time.Ticker{}, }, - Cfg: &ServerConfig{}, - store: st, - cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + Cfg: &ServerConfig{}, + store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, } s.start() m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}} @@ -1099,11 +1111,13 @@ func TestRemoveMember(t *testing.T) { raftStorage: raft.NewMemoryStorage(), storage: mockstorage.NewStorageRecorder(""), transport: rafthttp.NewNopTransporter(), + ticker: &time.Ticker{}, }, - Cfg: &ServerConfig{}, - store: st, - cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + Cfg: &ServerConfig{}, + store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, } s.start() err := s.RemoveMember(context.TODO(), 1234) @@ -1138,10 +1152,12 @@ func TestUpdateMember(t *testing.T) { raftStorage: raft.NewMemoryStorage(), storage: mockstorage.NewStorageRecorder(""), transport: rafthttp.NewNopTransporter(), + ticker: &time.Ticker{}, }, - store: st, - cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), + store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, } s.start() wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} @@ -1173,11 +1189,12 @@ func TestPublish(t *testing.T) { readych: make(chan struct{}), Cfg: &ServerConfig{TickMs: 1}, id: 1, - r: raftNode{Node: n}, + r: raftNode{Node: n, ticker: &time.Ticker{}}, attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, cluster: &membership.RaftCluster{}, w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, } srv.publish(time.Hour) @@ -1216,13 +1233,15 @@ func TestPublishStopped(t *testing.T) { r: raftNode{ Node: newNodeNop(), transport: rafthttp.NewNopTransporter(), + ticker: &time.Ticker{}, }, - cluster: &membership.RaftCluster{}, - w: mockwait.NewNop(), - done: make(chan struct{}), - stopping: make(chan struct{}), - stop: make(chan struct{}), - reqIDGen: idutil.NewGenerator(0, time.Time{}), + cluster: &membership.RaftCluster{}, + w: mockwait.NewNop(), + done: make(chan struct{}), + stopping: make(chan struct{}), + stop: make(chan struct{}), + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, } close(srv.stopping) srv.publish(time.Hour) @@ -1232,11 +1251,12 @@ func TestPublishStopped(t *testing.T) { func TestPublishRetry(t *testing.T) { n := newNodeRecorderStream() srv := &EtcdServer{ - Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: n}, - w: mockwait.NewNop(), - stopping: make(chan struct{}), - reqIDGen: idutil.NewGenerator(0, time.Time{}), + Cfg: &ServerConfig{TickMs: 1}, + r: raftNode{Node: n, ticker: &time.Ticker{}}, + w: mockwait.NewNop(), + stopping: make(chan struct{}), + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, } // expect multiple proposals from retrying ch := make(chan struct{}) @@ -1270,11 +1290,12 @@ func TestUpdateVersion(t *testing.T) { srv := &EtcdServer{ id: 1, Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: n}, + r: raftNode{Node: n, ticker: &time.Ticker{}}, attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, cluster: &membership.RaftCluster{}, w: w, reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, } srv.updateClusterVersion("2.0.0") diff --git a/integration/cluster.go b/integration/cluster.go index 72d51a1fe03..e683e2c9fe3 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -603,7 +603,7 @@ func (m *member) Launch() error { if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil { return fmt.Errorf("failed to initialize the etcd server: %v", err) } - m.s.SyncTicker = time.Tick(500 * time.Millisecond) + m.s.SyncTicker = time.NewTicker(500 * time.Millisecond) m.s.Start() m.raftHandler = &testutil.PauseableHandler{Next: v2http.NewPeerHandler(m.s)} diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 6c54869c799..ed38b7e2941 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -105,7 +105,7 @@ func TestV3LeaseGrantByID(t *testing.T) { } // create duplicate fixed lease - lresp, err = toGRPC(clus.RandClient()).Lease.LeaseGrant( + _, err = toGRPC(clus.RandClient()).Lease.LeaseGrant( context.TODO(), &pb.LeaseGrantRequest{ID: 1, TTL: 1}) if !eqErrGRPC(err, rpctypes.ErrGRPCLeaseExist) { diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 1907acd8f66..dfcd77f4f33 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -55,11 +55,12 @@ func TestLessorGrant(t *testing.T) { t.Errorf("term = %v, want at least %v", l.Remaining(), minLeaseTTLDuration-time.Second) } - nl, err := le.Grant(1, 1) + _, err = le.Grant(1, 1) if err == nil { t.Errorf("allocated the same lease") } + var nl *Lease nl, err = le.Grant(2, 1) if err != nil { t.Errorf("could not grant lease 2 (%v)", err) diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index c111613cea6..41a42420bc0 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -108,7 +108,6 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { } } b.ResetTimer() - s = NewStore(be, &lease.FakeLessor{}, &i) } func BenchmarkStoreRestoreRevs1(b *testing.B) { diff --git a/pkg/transport/keepalive_listener_test.go b/pkg/transport/keepalive_listener_test.go index 67f21fe5d4b..f8c062cf72f 100644 --- a/pkg/transport/keepalive_listener_test.go +++ b/pkg/transport/keepalive_listener_test.go @@ -45,6 +45,10 @@ func TestNewKeepAliveListener(t *testing.T) { ln.Close() ln, err = net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("unexpected Listen error: %v", err) + } + // tls tmp, err := createTempFile([]byte("XXX")) if err != nil { diff --git a/raft/node_test.go b/raft/node_test.go index ba604ee86e7..c57fd05fab5 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -301,6 +301,7 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { n.Campaign(context.TODO()) rdyEntries := make([]raftpb.Entry, 0) ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() done := make(chan struct{}) stop := make(chan struct{}) applyConfChan := make(chan struct{}) diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 40d66687387..0f29428b2ea 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -142,7 +142,8 @@ func (cw *streamWriter) run() { flusher http.Flusher batched int ) - tickc := time.Tick(ConnReadTimeout / 3) + tickc := time.NewTicker(ConnReadTimeout / 3) + defer tickc.Stop() unflushed := 0 plog.Infof("started streaming with peer %s (writer)", cw.peerID) @@ -214,7 +215,7 @@ func (cw *streamWriter) run() { plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t) } plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t) - heartbeatc, msgc = tickc, cw.msgc + heartbeatc, msgc = tickc.C, cw.msgc case <-cw.stopc: if cw.close() { plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) diff --git a/store/store_test.go b/store/store_test.go index 006a488705d..1a4527352d4 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -348,6 +348,7 @@ func TestStoreUpdateValueTTL(t *testing.T) { var eidx uint64 = 2 s.Create("/foo", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) _, err := s.Update("/foo", "baz", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) + assert.Nil(t, err, "") e, _ := s.Get("/foo", false, false) assert.Equal(t, *e.Node.Value, "baz", "") assert.Equal(t, e.EtcdIndex, eidx, "") @@ -368,6 +369,7 @@ func TestStoreUpdateDirTTL(t *testing.T) { s.Create("/foo", true, "", false, TTLOptionSet{ExpireTime: Permanent}) s.Create("/foo/bar", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) e, err := s.Update("/foo", "", TTLOptionSet{ExpireTime: fc.Now().Add(500 * time.Millisecond)}) + assert.Nil(t, err, "") assert.Equal(t, e.Node.Dir, true, "") assert.Equal(t, e.EtcdIndex, eidx, "") e, _ = s.Get("/foo/bar", false, false) @@ -911,6 +913,7 @@ func TestStoreRecover(t *testing.T) { s.Update("/foo/x", "barbar", TTLOptionSet{ExpireTime: Permanent}) s.Create("/foo/y", false, "baz", false, TTLOptionSet{ExpireTime: Permanent}) b, err := s.Save() + assert.Nil(t, err, "") s2 := newStore() s2.Recovery(b) @@ -940,6 +943,7 @@ func TestStoreRecoverWithExpiration(t *testing.T) { s.Create("/foo/x", false, "bar", false, TTLOptionSet{ExpireTime: Permanent}) s.Create("/foo/y", false, "baz", false, TTLOptionSet{ExpireTime: fc.Now().Add(5 * time.Millisecond)}) b, err := s.Save() + assert.Nil(t, err, "") time.Sleep(10 * time.Millisecond) diff --git a/store/watcher_hub.go b/store/watcher_hub.go index 6dd63f3c541..13c23e391d9 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -116,7 +116,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde } func (wh *watcherHub) add(e *Event) { - e = wh.EventHistory.addEvent(e) + wh.EventHistory.addEvent(e) } // notify function accepts an event and notify to the watchers. diff --git a/wal/repair_test.go b/wal/repair_test.go index 313501dace8..f72faae496d 100644 --- a/wal/repair_test.go +++ b/wal/repair_test.go @@ -49,7 +49,11 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect defer os.RemoveAll(p) // create WAL w, err := Create(p, nil) - defer w.Close() + defer func() { + if err = w.Close(); err != nil { + t.Fatal(err) + } + }() if err != nil { t.Fatal(err) } diff --git a/wal/wal_test.go b/wal/wal_test.go index a762ee255c6..1be7ef1e91f 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -546,7 +546,11 @@ func TestReleaseLockTo(t *testing.T) { defer os.RemoveAll(p) // create WAL w, err := Create(p, nil) - defer w.Close() + defer func() { + if err = w.Close(); err != nil { + t.Fatal(err) + } + }() if err != nil { t.Fatal(err) } @@ -712,7 +716,11 @@ func TestOpenOnTornWrite(t *testing.T) { } defer os.RemoveAll(p) w, err := Create(p, nil) - defer w.Close() + defer func() { + if err = w.Close(); err != nil && err != os.ErrInvalid { + t.Fatal(err) + } + }() if err != nil { t.Fatal(err) }