Skip to content
This repository has been archived by the owner on Sep 2, 2022. It is now read-only.

Commit

Permalink
this commit makes the concensus module robust to node failures and so…
Browse files Browse the repository at this point in the history
…lves #67
  • Loading branch information
SUMUKHA-PK committed Oct 26, 2020
1 parent f46bf7f commit 49faa22
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 21 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ module github.com/xqueries/xdb
go 1.13

require (
github.com/fortytw2/leaktest v1.3.0
github.com/golang/protobuf v1.4.2
github.com/google/go-cmp v0.5.2
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/oklog/ulid v1.3.1
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/pkg/errors v0.8.1
github.com/rs/zerolog v1.19.0
github.com/sasha-s/go-deadlock v0.2.0
github.com/spf13/afero v1.3.5
github.com/spf13/cobra v1.0.0
github.com/stretchr/testify v1.6.1
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
Expand Down Expand Up @@ -90,6 +92,8 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand All @@ -113,6 +117,8 @@ github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.19.0 h1:hYz4ZVdUgjXTBUmrkrw55j1nHx68LfOKIQk5IYtyScg=
github.com/rs/zerolog v1.19.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y=
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
Expand Down
1 change: 0 additions & 1 deletion internal/network/tcp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func (c *tcpConn) receiveAsync() chan interface{} {

func (c *tcpConn) Close() error {
if atomic.LoadInt32(&c.closed) == 1 {
fmt.Println("idena")
return nil
}

Expand Down
21 changes: 13 additions & 8 deletions internal/raft/cluster/tcp_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type tcpCluster struct {
started chan struct{}
startedClosed bool
closed int32

closeChan chan struct{}
// lock needed for atomic write on startedClosed.
mu sync.Mutex
}
Expand All @@ -56,6 +56,7 @@ func newTCPCluster(log zerolog.Logger) *tcpCluster {
messages: make(chan incomingPayload, tcpClusterMessageQueueBufferSize),
started: make(chan struct{}),
startedClosed: false,
closeChan: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -147,10 +148,8 @@ func (c *tcpCluster) Broadcast(ctx context.Context, msg message.Message) error {
// After Close is called on this cluster, it is no longer usable.
func (c *tcpCluster) Close() error {
if atomic.LoadInt32(&c.closed) == 1 {
fmt.Println("XXX")
return nil
}
fmt.Println("In")
atomic.StoreInt32(&c.closed, 1)

// close all connections
Expand All @@ -164,7 +163,8 @@ func (c *tcpCluster) Close() error {
errs.Go(c.server.Close)

// close the message queue
close(c.messages)
c.closeChan <- struct{}{}
//close(c.messages)

return errs.Wait()
}
Expand Down Expand Up @@ -267,10 +267,15 @@ func (c *tcpCluster) receiveMessages(conn network.Conn) {
return // abort this goroutine
}

// push payload and connection onto the message queue
c.messages <- incomingPayload{
origin: conn,
payload: data,
select {
case <-c.closeChan:
close(c.messages)
default:
// push payload and connection onto the message queue
c.messages <- incomingPayload{
origin: conn,
payload: data,
}
}
}
}
26 changes: 15 additions & 11 deletions internal/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/rs/zerolog"
"github.com/sasha-s/go-deadlock"
"github.com/xqueries/xdb/internal/id"
"github.com/xqueries/xdb/internal/network"
"github.com/xqueries/xdb/internal/raft/message"
Expand Down Expand Up @@ -53,7 +53,7 @@ type PersistentState struct {
LeaderID id.ID // LeaderID is nil at init, and the ID of the node after the leader is elected.
PeerIPs []network.Conn // PeerIPs has the connection variables of all the other nodes in the cluster.
ConnIDMap map[id.ID]int // ConnIDMap has a mapping of the ID of the server to its connection.
mu sync.Mutex
mu deadlock.Mutex
}

// VolatileState describes the volatile state data on a raft node.
Expand All @@ -78,8 +78,9 @@ type SimpleServer struct {
onReplication ReplicationHandler
log zerolog.Logger
timeoutProvider func(*Node) *time.Timer
lock sync.Mutex
lock deadlock.Mutex

// Function setters.
onRequestVotes func(network.Conn)
onLeaderElected func()
onAppendEntriesRequest func(network.Conn)
Expand Down Expand Up @@ -180,7 +181,14 @@ func (s *SimpleServer) Start(ctx context.Context) (err error) {
// Listen forever on all node connections.
go func() {
for {
// Parallely start waiting for incoming data.
s.node.PersistentState.mu.Lock()
if s.node.Closed {
s.node.PersistentState.mu.Unlock()
return
}
s.node.PersistentState.mu.Unlock()

// Parallelly start waiting for incoming data.
conn, msg, err := s.cluster.Receive(ctx)
if err != nil {
// log.Printf("error in receiving from the cluster: %v\n", err)
Expand Down Expand Up @@ -234,7 +242,6 @@ func (s *SimpleServer) Start(ctx context.Context) (err error) {
s.lock.Unlock()
}
case data := <-liveChan:
fmt.Printf("took data %v\n", data.msg.Kind())
err = s.processIncomingData(data)
if err != nil {
log.Printf("error in processing data: %v\n", err)
Expand Down Expand Up @@ -275,7 +282,6 @@ func (s *SimpleServer) Input(input *message.Command) {
func (s *SimpleServer) Close() error {
s.lock.Lock()
// Maintaining idempotency of the close function.
fmt.Println(s.node.Closed)
if s.node.Closed {
return network.ErrClosed
}
Expand All @@ -291,13 +297,12 @@ func (s *SimpleServer) Close() error {

err := s.cluster.Close()
s.lock.Unlock()
fmt.Println(err)
return err
}

// randomTimer returns tickers ranging from 150ms to 300ms.
func randomTimer(node *Node) *time.Timer {
randomInt := rand.Intn(150) + 450
randomInt := rand.Intn(150) + 150
node.log.
Debug().
Str("self-id", node.PersistentState.SelfID.String()).
Expand Down Expand Up @@ -325,13 +330,10 @@ func (s *SimpleServer) processIncomingData(data *incomingData) error {
if err != nil {
return err
}
fmt.Println("Finished processing request vote")
case message.KindRequestVoteResponse:
requestVoteResponse := data.msg.(*message.RequestVoteResponse)
if requestVoteResponse.GetVoteGranted() {
fmt.Println("trying lock")
s.lock.Lock()
fmt.Println("got lock")
voterID := s.getNodeID(data.conn)
s.node.log.
Debug().
Expand Down Expand Up @@ -481,6 +483,7 @@ func (s *SimpleServer) getNodeID(conn network.Conn) id.ID {
s.node.PersistentState.mu.Lock()
for k, v := range s.node.PersistentState.ConnIDMap {
if s.node.PersistentState.PeerIPs[v] == conn {
s.node.PersistentState.mu.Unlock()
return k
}
}
Expand All @@ -499,6 +502,7 @@ func (s *SimpleServer) getNextIndex(conn network.Conn) (int, int) {
s.node.PersistentState.mu.Lock()
for i := range s.node.PersistentState.PeerIPs {
if conn == s.node.PersistentState.PeerIPs[i] {
s.node.PersistentState.mu.Unlock()
return s.node.VolatileStateLeader.NextIndex[i], i
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func timeoutProvider(node *Node) *time.Timer {
}

func TestIntegration(t *testing.T) {

log := zerolog.New(os.Stdout).With().Logger().Level(zerolog.GlobalLevel())

assert := assert.New(t)
Expand Down
6 changes: 5 additions & 1 deletion internal/raft/request_votes.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ func (s *SimpleServer) RequestVoteResponse(req *message.RequestVoteRequest) *mes
s.node.PersistentState.mu.Lock()
// If this node hasn't voted for any other node, vote only then.
// TODO: Check whether candidate's log is at least as up to date as mine only then grant vote.
if s.node.PersistentState.VotedFor == nil { // } && currentTerm == req.GetTerm() {
isSelfTermLesser := currentTerm < req.GetTerm()
isSelfTermEqual := currentTerm == req.GetTerm()
hasVotedYet := isSelfTermEqual && (s.node.PersistentState.VotedFor == nil)
if isSelfTermLesser || hasVotedYet {
cID, err := id.Parse(req.CandidateID)
if err != nil {
// no point in handling this because I really need that to parse into ID.
Expand All @@ -88,6 +91,7 @@ func (s *SimpleServer) RequestVoteResponse(req *message.RequestVoteRequest) *mes
}
s.node.PersistentState.mu.Unlock()

fmt.Println("Am I falsing here")
return &message.RequestVoteResponse{
Term: currentTerm,
VoteGranted: false,
Expand Down

0 comments on commit 49faa22

Please sign in to comment.