From 49faa22289894f79b635ef9d2ab350008265eff3 Mon Sep 17 00:00:00 2001 From: SUMUKHA-PK Date: Mon, 26 Oct 2020 16:58:39 +0530 Subject: [PATCH] this commit makes the concensus module robust to node failures and solves #67 --- go.mod | 3 +++ go.sum | 6 ++++++ internal/network/tcp_conn.go | 1 - internal/raft/cluster/tcp_cluster.go | 21 +++++++++++++-------- internal/raft/raft.go | 26 +++++++++++++++----------- internal/raft/raft_test.go | 1 + internal/raft/request_votes.go | 6 +++++- 7 files changed, 43 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 8b4a778b..e1696d96 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,15 @@ module github.com/xqueries/xdb go 1.13 require ( + github.com/fortytw2/leaktest v1.3.0 github.com/golang/protobuf v1.4.2 github.com/google/go-cmp v0.5.2 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/oklog/ulid v1.3.1 + github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect github.com/pkg/errors v0.8.1 github.com/rs/zerolog v1.19.0 + github.com/sasha-s/go-deadlock v0.2.0 github.com/spf13/afero v1.3.5 github.com/spf13/cobra v1.0.0 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index cf132afa..d4eaa58a 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -90,6 +92,8 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -113,6 +117,8 @@ github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.19.0 h1:hYz4ZVdUgjXTBUmrkrw55j1nHx68LfOKIQk5IYtyScg= github.com/rs/zerolog v1.19.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y= +github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= diff --git a/internal/network/tcp_conn.go b/internal/network/tcp_conn.go index fc7d3c1b..ac82a742 100644 --- a/internal/network/tcp_conn.go +++ b/internal/network/tcp_conn.go @@ -217,7 +217,6 @@ func (c *tcpConn) receiveAsync() chan interface{} { func (c *tcpConn) Close() error { if atomic.LoadInt32(&c.closed) == 1 { - fmt.Println("idena") return nil } diff --git a/internal/raft/cluster/tcp_cluster.go b/internal/raft/cluster/tcp_cluster.go index 65aa68d5..a6bc7ff3 100644 --- a/internal/raft/cluster/tcp_cluster.go +++ b/internal/raft/cluster/tcp_cluster.go @@ -32,7 +32,7 @@ type tcpCluster struct { started chan struct{} startedClosed bool closed int32 - + closeChan chan struct{} // lock needed for atomic write on startedClosed. mu sync.Mutex } @@ -56,6 +56,7 @@ func newTCPCluster(log zerolog.Logger) *tcpCluster { messages: make(chan incomingPayload, tcpClusterMessageQueueBufferSize), started: make(chan struct{}), startedClosed: false, + closeChan: make(chan struct{}, 1), } } @@ -147,10 +148,8 @@ func (c *tcpCluster) Broadcast(ctx context.Context, msg message.Message) error { // After Close is called on this cluster, it is no longer usable. func (c *tcpCluster) Close() error { if atomic.LoadInt32(&c.closed) == 1 { - fmt.Println("XXX") return nil } - fmt.Println("In") atomic.StoreInt32(&c.closed, 1) // close all connections @@ -164,7 +163,8 @@ func (c *tcpCluster) Close() error { errs.Go(c.server.Close) // close the message queue - close(c.messages) + c.closeChan <- struct{}{} + //close(c.messages) return errs.Wait() } @@ -267,10 +267,15 @@ func (c *tcpCluster) receiveMessages(conn network.Conn) { return // abort this goroutine } - // push payload and connection onto the message queue - c.messages <- incomingPayload{ - origin: conn, - payload: data, + select { + case <-c.closeChan: + close(c.messages) + default: + // push payload and connection onto the message queue + c.messages <- incomingPayload{ + origin: conn, + payload: data, + } } } } diff --git a/internal/raft/raft.go b/internal/raft/raft.go index 801d4d38..efb18c21 100644 --- a/internal/raft/raft.go +++ b/internal/raft/raft.go @@ -6,11 +6,11 @@ import ( "io" "log" "math/rand" - "sync" "sync/atomic" "time" "github.com/rs/zerolog" + "github.com/sasha-s/go-deadlock" "github.com/xqueries/xdb/internal/id" "github.com/xqueries/xdb/internal/network" "github.com/xqueries/xdb/internal/raft/message" @@ -53,7 +53,7 @@ type PersistentState struct { LeaderID id.ID // LeaderID is nil at init, and the ID of the node after the leader is elected. PeerIPs []network.Conn // PeerIPs has the connection variables of all the other nodes in the cluster. ConnIDMap map[id.ID]int // ConnIDMap has a mapping of the ID of the server to its connection. - mu sync.Mutex + mu deadlock.Mutex } // VolatileState describes the volatile state data on a raft node. @@ -78,8 +78,9 @@ type SimpleServer struct { onReplication ReplicationHandler log zerolog.Logger timeoutProvider func(*Node) *time.Timer - lock sync.Mutex + lock deadlock.Mutex + // Function setters. onRequestVotes func(network.Conn) onLeaderElected func() onAppendEntriesRequest func(network.Conn) @@ -180,7 +181,14 @@ func (s *SimpleServer) Start(ctx context.Context) (err error) { // Listen forever on all node connections. go func() { for { - // Parallely start waiting for incoming data. + s.node.PersistentState.mu.Lock() + if s.node.Closed { + s.node.PersistentState.mu.Unlock() + return + } + s.node.PersistentState.mu.Unlock() + + // Parallelly start waiting for incoming data. conn, msg, err := s.cluster.Receive(ctx) if err != nil { // log.Printf("error in receiving from the cluster: %v\n", err) @@ -234,7 +242,6 @@ func (s *SimpleServer) Start(ctx context.Context) (err error) { s.lock.Unlock() } case data := <-liveChan: - fmt.Printf("took data %v\n", data.msg.Kind()) err = s.processIncomingData(data) if err != nil { log.Printf("error in processing data: %v\n", err) @@ -275,7 +282,6 @@ func (s *SimpleServer) Input(input *message.Command) { func (s *SimpleServer) Close() error { s.lock.Lock() // Maintaining idempotency of the close function. - fmt.Println(s.node.Closed) if s.node.Closed { return network.ErrClosed } @@ -291,13 +297,12 @@ func (s *SimpleServer) Close() error { err := s.cluster.Close() s.lock.Unlock() - fmt.Println(err) return err } // randomTimer returns tickers ranging from 150ms to 300ms. func randomTimer(node *Node) *time.Timer { - randomInt := rand.Intn(150) + 450 + randomInt := rand.Intn(150) + 150 node.log. Debug(). Str("self-id", node.PersistentState.SelfID.String()). @@ -325,13 +330,10 @@ func (s *SimpleServer) processIncomingData(data *incomingData) error { if err != nil { return err } - fmt.Println("Finished processing request vote") case message.KindRequestVoteResponse: requestVoteResponse := data.msg.(*message.RequestVoteResponse) if requestVoteResponse.GetVoteGranted() { - fmt.Println("trying lock") s.lock.Lock() - fmt.Println("got lock") voterID := s.getNodeID(data.conn) s.node.log. Debug(). @@ -481,6 +483,7 @@ func (s *SimpleServer) getNodeID(conn network.Conn) id.ID { s.node.PersistentState.mu.Lock() for k, v := range s.node.PersistentState.ConnIDMap { if s.node.PersistentState.PeerIPs[v] == conn { + s.node.PersistentState.mu.Unlock() return k } } @@ -499,6 +502,7 @@ func (s *SimpleServer) getNextIndex(conn network.Conn) (int, int) { s.node.PersistentState.mu.Lock() for i := range s.node.PersistentState.PeerIPs { if conn == s.node.PersistentState.PeerIPs[i] { + s.node.PersistentState.mu.Unlock() return s.node.VolatileStateLeader.NextIndex[i], i } } diff --git a/internal/raft/raft_test.go b/internal/raft/raft_test.go index d9d1bba3..b13a5eeb 100644 --- a/internal/raft/raft_test.go +++ b/internal/raft/raft_test.go @@ -242,6 +242,7 @@ func timeoutProvider(node *Node) *time.Timer { } func TestIntegration(t *testing.T) { + log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel()) assert := assert.New(t) diff --git a/internal/raft/request_votes.go b/internal/raft/request_votes.go index afabaabb..1c1ba168 100644 --- a/internal/raft/request_votes.go +++ b/internal/raft/request_votes.go @@ -66,7 +66,10 @@ func (s *SimpleServer) RequestVoteResponse(req *message.RequestVoteRequest) *mes s.node.PersistentState.mu.Lock() // If this node hasn't voted for any other node, vote only then. // TODO: Check whether candidate's log is at least as up to date as mine only then grant vote. - if s.node.PersistentState.VotedFor == nil { // } && currentTerm == req.GetTerm() { + isSelfTermLesser := currentTerm < req.GetTerm() + isSelfTermEqual := currentTerm == req.GetTerm() + hasVotedYet := isSelfTermEqual && (s.node.PersistentState.VotedFor == nil) + if isSelfTermLesser || hasVotedYet { cID, err := id.Parse(req.CandidateID) if err != nil { // no point in handling this because I really need that to parse into ID. @@ -88,6 +91,7 @@ func (s *SimpleServer) RequestVoteResponse(req *message.RequestVoteRequest) *mes } s.node.PersistentState.mu.Unlock() + fmt.Println("Am I falsing here") return &message.RequestVoteResponse{ Term: currentTerm, VoteGranted: false,