-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathserver.go
102 lines (84 loc) · 2.46 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package raft
import (
"context"
"io"
pb "github.com/bbengfort/raft/api/v1beta1"
)
// Commit a client request to append some entry to the log.
func (r *Replica) Commit(ctx context.Context, in *pb.CommitRequest) (*pb.CommitReply, error) {
// If the replica is not the leader, forward to the leader.
if r.leader != r.Name {
return r.makeRedirect(), nil
}
// Record the request
go func() { r.Metrics.Request(in.Identity) }()
// Create a channel to wait for the commit handler
source := make(chan *pb.CommitReply, 1)
// Dispatch the event and wait for it to be handled
event := &event{etype: CommitRequestEvent, source: source, value: in}
if err := r.Dispatch(event); err != nil {
return nil, err
}
out := <-source
return out, nil
}
// helper function to create a redirect message.
func (r *Replica) makeRedirect() *pb.CommitReply {
var errMsg string
if r.leader != "" {
errMsg = "redirect"
} else {
errMsg = "no leader available"
}
return &pb.CommitReply{
Success: false, Redirect: r.leader, Error: errMsg, Entry: nil,
}
}
// RequestVote from peers whose election timeout has elapsed.
func (r *Replica) RequestVote(ctx context.Context, in *pb.VoteRequest) (*pb.VoteReply, error) {
// Create a channel to wait for event handler on.
source := make(chan *pb.VoteReply, 1)
// Dispatch the event received and wait for it to be handled
event := &event{
etype: VoteRequestEvent,
source: source,
value: in,
}
if err := r.Dispatch(event); err != nil {
return nil, err
}
out := <-source
return out, nil
}
// AppendEntries from leader for either heartbeat or consensus.
func (r *Replica) AppendEntries(stream pb.Raft_AppendEntriesServer) (err error) {
// Keep receiving messages on the stream and sending replies after each
// message is sent on the stream.
for {
var in *pb.AppendRequest
if in, err = stream.Recv(); err != nil {
if err == io.EOF {
return nil
}
return err
}
// Create a channel to wait for event handler on.
source := make(chan *pb.AppendReply, 1)
// Dispatch the event received and wait for it to be handled
event := &event{
etype: AppendRequestEvent,
source: source,
value: in,
}
if err = r.Dispatch(event); err != nil {
return err
}
// Wait for the event to be handled before receiving the next
// message on the stream; this ensures that the order of messages
// received matches the order of replies sent.
out := <-source
if err = stream.Send(out); err != nil {
return err
}
}
}