-
Notifications
You must be signed in to change notification settings - Fork 1
/
acceptor.cpp
86 lines (76 loc) · 2.98 KB
/
acceptor.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
//
// Created by David Chu on 10/4/20.
//
#include "acceptor.hpp"
acceptor::acceptor(std::string&& acceptorGroupId) :acceptorGroupId(acceptorGroupId) {
metricsVars = metrics::createMetricsVars({ metrics::NumProcessedMessages, metrics::P1BPreempted,
metrics::P1BSuccess, metrics::P2BPreempted},{},{},{},
"acceptor" + acceptorGroupId);
zmqNetwork = new network();
annaClient = anna::writeOnly(zmqNetwork, {
{config::KEY_ACCEPTOR_GROUPS, acceptorGroupId},
{acceptorGroupId, config::IP_ADDRESS}
});
proxyLeaders = new server_component(zmqNetwork, config::ACCEPTOR_PORT_FOR_PROXY_LEADERS, ProxyLeader,
[](const std::string& address, const time_t now) {
BENCHMARK_LOG("Proxy leader from {} connected to acceptor", address);
}, [&](const network::addressPayloadsMap& addressToPayloads, const time_t now) {
listenToProxyLeaders(addressToPayloads);
});
zmqNetwork->poll();
}
void acceptor::listenToProxyLeaders(const network::addressPayloadsMap& addressToPayloads) {
ProposerToAcceptor proposerToAcceptor;
std::string reply;
for (const auto&[address, payloads] : addressToPayloads) {
for (const std::string& payload : payloads) {
proposerToAcceptor.ParseFromString(payload);
switch (proposerToAcceptor.type()) {
case ProposerToAcceptor_Type_p1a: {
BENCHMARK_LOG("Received p1a: {}, highestBallot: {}", proposerToAcceptor.ShortDebugString(),
highestBallot.ShortDebugString());
if (Log::isBallotGreaterThan(proposerToAcceptor.ballot(), highestBallot)) {
highestBallot = proposerToAcceptor.ballot();
metricsVars->counters[metrics::P1BSuccess]->Increment();
}
else {
metricsVars->counters[metrics::P1BPreempted]->Increment();
}
reply = message::createP1B(proposerToAcceptor.messageid(), acceptorGroupId, highestBallot, log)
.SerializeAsString();
break;
}
case ProposerToAcceptor_Type_p2a:
LOG("Received p2a: {}", proposerToAcceptor.ShortDebugString());
TIME();
if (!Log::isBallotGreaterThan(highestBallot, proposerToAcceptor.ballot())) {
PValue pValue;
pValue.set_client(proposerToAcceptor.client());
pValue.set_payload(proposerToAcceptor.payload());
*pValue.mutable_ballot() = proposerToAcceptor.ballot();
log[proposerToAcceptor.slot()] = pValue;
highestBallot = proposerToAcceptor.ballot();
metricsVars->counters[metrics::NumProcessedMessages]->Increment();
}
else {
metricsVars->counters[metrics::P2BPreempted]->Increment();
}
reply = message::createP2B(proposerToAcceptor.messageid(), acceptorGroupId, highestBallot,
proposerToAcceptor.slot()).SerializeAsString();
TIME();
break;
default: {}
}
proxyLeaders->sendToIp(address, reply);
proposerToAcceptor.Clear();
}
}
}
int main(const int argc, const char** argv) {
if (argc != 2) {
printf("Usage: ./acceptor <ACCEPTOR GROUP ID>\n");
exit(0);
}
INIT_LOGGER();
acceptor a {argv[1]};
}