-
Notifications
You must be signed in to change notification settings - Fork 1
/
unbatcher.cpp
74 lines (60 loc) · 2.48 KB
/
unbatcher.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//
// Created by David Chu on 11/11/20.
//
#include "unbatcher.hpp"
unbatcher::unbatcher() {
metricsVars = metrics::createMetricsVars({ metrics::NumIncomingMessages, metrics::NumOutgoingMessages},{},{},{},
"unbatcher");
zmqNetwork = new network();
annaClient = anna::writeOnly(zmqNetwork, {{config::KEY_UNBATCHERS, config::IP_ADDRESS}});
clients = new client_component(zmqNetwork, config::CLIENT_PORT_FOR_UNBATCHERS, Client,
[](const std::string& address, const time_t now) {
BENCHMARK_LOG("Unbatcher connected to client at {}", address);
},[](const std::string& address, const time_t now) {
BENCHMARK_LOG("ERROR??: Unbatcher disconnected from client at {}", address);
}, [](const network::addressPayloadsMap& addressToPayloads, const time_t now) {
for (const auto&[address, payloads] : addressToPayloads)
BENCHMARK_LOG("ERROR??: Client at {} sent unbatcher something", address);
});
proxyLeaders = new server_component(zmqNetwork, config::UNBATCHER_PORT_FOR_PROXY_LEADERS, ProxyLeader,
[&](const std::string& address, const time_t now) {
BENCHMARK_LOG("Proxy leader from {} connected to unbatcher", address);
}, [&](const network::addressPayloadsMap& addressToPayloads, const time_t now) {
listenToProxyLeaders(addressToPayloads);
});
proxyLeaders->startHeartbeater();
zmqNetwork->poll();
}
void unbatcher::listenToProxyLeaders(const network::addressPayloadsMap& addressToPayloads) {
Batch batch;
for (const auto&[address, payloads] : addressToPayloads) {
metricsVars->counters[metrics::NumIncomingMessages]->Increment(payloads.size());
for (const std::string& payload : payloads) {
batch.ParseFromString(payload);
LOG("Unbatcher received payload: {}", batch.ShortDebugString());
TIME();
if (!clients->isConnected(batch.client())) {
clients->connectToNewMembers({{batch.client()},{}}, 0);
BENCHMARK_LOG("Unbatcher connecting to client at {}", batch.client());
}
//split request
std::stringstream stream(batch.request());
std::string request;
while (std::getline(stream, request, config::REQUEST_DELIMITER[0])) {
LOG("Sending split request: {}", request);
clients->sendToIp(batch.client(), request);
metricsVars->counters[metrics::NumOutgoingMessages]->Increment();
}
TIME();
batch.Clear();
}
}
}
int main(const int argc, const char** argv) {
if (argc != 1) {
printf("Usage: ./unbatcher\n");
exit(0);
}
INIT_LOGGER();
unbatcher u {};
}