-
Notifications
You must be signed in to change notification settings - Fork 1
/
ZooKeeper.hpp
424 lines (371 loc) · 11.4 KB
/
ZooKeeper.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
#ifndef _ZOOKEEPER_HPP_
#define _ZOOKEEPER_HPP_
extern "C" {
#include <zookeeper/zookeeper.h>
#include <zookeeper/proto.h>
}
#include <exception>
#include <string>
#include <list>
#include <vector>
#include <sstream>
#include <iostream>
#include <memory>
#include <cassert>
#include <cstdarg>
#include <cerrno>
//extern struct ACL_vector ZOO_OPEN_ACL_UNSAFE;
extern int errno;
#define ZOOKEEPER_NAMESPACE_BEGIN namespace org { namespace apache { namespace zookeeper {
#define ZOOKEEPER_NAMESPACE_END }}}
#define LOG_NAMESPACE_BEGIN namespace Log {
#define LOG_NAMESPACE_END }
ZOOKEEPER_NAMESPACE_BEGIN
using namespace std;
template <typename def,typename type = typename def::type>
class safe_enum:public def{
type val;
public:
//crafty implicit conversion
safe_enum(type v):val(v){}
type value()const{return val;}
safe_enum(safe_enum const& other):val(other.val){}
safe_enum& operator=(safe_enum const& other){
this->val = other.val;
return *this;
}
bool operator == (safe_enum const& s)const {
return this->val == s.val;
}
bool operator < (safe_enum const& s)const{
return this->val < s.val;
}
bool operator <= (safe_enum const& s)const{
return *this<s || *this==s;
}
bool operator > (safe_enum const& s) const{
return !(*this<=s);
}
bool operator >=(safe_enum const& s) const{
return !(*this<s);
}
bool operator !=(safe_enum const& s) const{
return !(*this==s);
}
};
LOG_NAMESPACE_BEGIN
struct LogLevel_Def{
enum type{
INFO,
WARN,
ERROR,
ASSERT,
DEBUG,
EXCEPTION,
dummy_HDKHKLFHRJKGFGFWWXZZJLGWWIOJJVX
};
};
typedef safe_enum<LogLevel_Def> LogLevel;
static string timestamp(){
time_t t;
struct tm tm0;
char buff[64];
time(&t);
size_t n=strftime(buff,64,"%Y-%m-%d %T",localtime_r(&t,&tm0));
buff[n]=0;
return string(buff);
}
string vformat(string const&,va_list);
string format(string const&,...);
void vlog(ostream&, LogLevel, const char*, const int,
const char*,const char* fmt, va_list ap);
void log(ostream&, LogLevel, const char*, const int,
const char*, const char*, ...);
#define LOGi(msg,...)\
do{\
log(cerr,LogLevel::INFO,__FILE__,__LINE__,__FUNCTION__,(msg),##__VA_ARGS__);\
}while(0);
#define LOGw(msg,...)\
do{\
log(cerr,LogLevel::WARN,__FILE__,__LINE__,__FUNCTION__,(msg),##__VA_ARGS__);\
}while(0);
#define LOGe(msg,...)\
do{\
log(cerr,LogLevel::ERROR,__FILE__,__LINE__,__FUNCTION__,(msg),##__VA_ARGS__);\
}while(0);
#define LOGa(boolexpr,msg,...)\
do{\
log(cerr,LogLevel::ASSERT,__FILE__,__LINE__,__FUNCTION__,(msg),##__VA_ARGS__);\
assert(boolexpr);\
}while(0);
#ifdef DEBUG
#define LOGd(msg,...)\
do{\
log(cerr,LogLevel::DEBUG,__FILE__,__LINE__,__FUNCTION__,(msg),##__VA_ARGS__);\
}while(0);
#else
#define LOGd(msg,...)do{}while(0);
#endif
#define THROW(msg,...)\
do{\
stringstream out;\
log(out,LogLevel::EXCEPTION,__FILE__,__LINE__,__FUNCTION__,(msg),##__VA_ARGS__);\
throw out.str();\
}while(0);
template<typename T>
string to_string(T const& a){
return ((stringstream&)(stringstream()<<a)).str();
}
string format(const string& fmt,...){
va_list ap;
va_start(ap,fmt);
string s=vformat(fmt,ap);
va_end(ap);
return s;
}
string vformat(const string& fmt,va_list ap){
string s(256,0);
if(vsnprintf((char*)&*s.begin(),s.size(),fmt.c_str(),ap)<0){
THROW("failed to format s string");
}
s.resize(s.size()-1);
return s;
}
inline void log(ostream& out, LogLevel level, const char* file,
int line, const char* func, const char* fmt,...){
va_list ap;
va_start(ap,fmt);
vlog(out,level,file,line,func,fmt,ap);
va_end(ap);
}
inline void vlog(ostream& out, LogLevel level, const char* file,
const int line, const char* func, const char* fmt, va_list ap)
{
const char* logs[]={"INFO","WARN","ERROR","ASSERT","DEBUG","EXCEPTION"};
stringstream pack;
string s;
string msg=vformat(fmt,ap);
pack<<timestamp()<<" "
<<logs[level.value()]
<<" ["<<file<<"] "
<<func<<"#"<<line<<": "
<<msg<<endl;
out<<pack.str();
}
LOG_NAMESPACE_END
using namespace Log;
typedef void (*watch_fn)(zhandle_t*,int,int,const char*,void*);
class ZooKeeper;
struct WatchedEvent;
class Watcher;
static void watch_process(zhandle_t*,int,int, const char*,void*);
struct CreateMode_Def{
enum type{
PERSISTENT=0,
EPHEMERAL=1,
PERSISTENT_SEQUENTIAL=2,
EPHEMERAL_SEQUENTIAL=3,
dummy_PBDFDXBXZWJNVFFJHCGCSSHVVXAAQW
};
};
struct KeeperState_Def{
enum type{
Expired = -112,
AuthFailed = -113,
Connecting = 1,
Associating = 2,
Connected = 3,
NotConnected = 999,
dummy_LHKJITJKGGFHKJFDDHJKGFRDVHHCR
};
};
struct EventType_Def{
enum type{
NodeCreated = 1,
NodeDeleted = 2,
NodeDataChanged = 3,
NodeChildrenChanged = 4,
Session = -1,
NotWatching = -2,
dummy_BFJHVLKCCKFBDSSSLKBCSGFNBXXCT
};
};
typedef safe_enum<CreateMode_Def> CreateMode;
typedef safe_enum<KeeperState_Def> KeeperState;
typedef safe_enum<EventType_Def> EventType;
struct WatchedEvent{
WatchedEvent(string const& p,int ks,int et):
path(p),keeperState(ks),eventType(et){}
string path;
int keeperState;
int eventType;
};
class Watcher{
private:
ZooKeeper *zookeeper;
public:
Watcher(){}
explicit Watcher(ZooKeeper* zk):zookeeper(zk){}
virtual ~Watcher() {}
ZooKeeper* getZooKeeper()const{
return zookeeper;
}
void setZooKeeper(ZooKeeper *zk){
zookeeper = zk;
}
virtual void process(const WatchedEvent& event){
LOGi("Events are processed by default watcher");
}
};
// A RAII wrapper for String_vector struct
class StringVectorWrapper {
private:
String_vector wrapped;
StringVectorWrapper(StringVectorWrapper const&);
StringVectorWrapper& operator=(StringVectorWrapper const&);
public:
StringVectorWrapper(){
wrapped.count = 0;
wrapped.data = NULL;
}
~StringVectorWrapper(){
deallocate_String_vector(&wrapped);
}
operator struct String_vector*(){
return &wrapped;
}
list<string> operator()(){
list<string> slst;
for (int i = 0; i < wrapped.count; ++i){
slst.push_back(wrapped.data[i]);
}
return slst;
}
};
class ZooKeeper{
public:
void watch_callback(const WatchedEvent& event){
LOGi("zk_watcher->process(event)");
zk_watcher->process(event);
}
private:
zhandle_t* zk_handle;
clientid_t zk_clientid;
shared_ptr<Watcher> zk_watcher;
public:
typedef uint8_t byte_t;
typedef long long int64_t;
typedef void (*watch_fn)(zhandle_t*,int,int,const char*,void*);
ZooKeeper(string const& connStr,int sessionTimeout,
shared_ptr<Watcher> watcher,int64_t sessionId,
string const& passwd, int flags){
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
zk_clientid.client_id=sessionId;
size_t passwd_len=sizeof(zk_clientid.passwd);
//cout<<"passwd_len="<<passwd_len<<endl;
//cout<<"passwd_size()"<<passwd.size()<<endl;
assert(passwd.size()==passwd_len);
std::copy(passwd.begin(),passwd.end(),zk_clientid.passwd);
//LOGi("passwd_len=%u",passwd_len);
//LOGi("passwd_size()=%u",passwd.size());
zk_handle = zookeeper_init(connStr.c_str(), watch_process, sessionTimeout,
&zk_clientid, NULL, flags);
if (!zk_handle){
THROW(zerror(errno));
}
zk_watcher=watcher;
zk_watcher->setZooKeeper(this);
zoo_set_context(zk_handle,this);
LOGi("Succeeded in connecting to ZK specified by %s",connStr.c_str());
}
ZooKeeper(string const& connStr, int sessionTimeout, shared_ptr<Watcher> watcher,
int64_t sessionId, string const& passwd):
ZooKeeper(connStr,sessionTimeout,watcher,sessionId,passwd, 0){}
ZooKeeper(string const& connStr, int sessionTimeout, shared_ptr<Watcher> watcher, int flags):
ZooKeeper(connStr,sessionTimeout,watcher,0,string(16,'0'), 0){}
ZooKeeper(string const& connStr, int sessionTimeout,shared_ptr<Watcher> watcher):
ZooKeeper(connStr,sessionTimeout, watcher, 0){}
~ZooKeeper(){
zookeeper_close(zk_handle);
}
string zk_create(string const& path,string const& data,CreateMode createMode){
char path_buff[256]={0};
int rc=zoo_create(zk_handle, path.c_str(), data.c_str(), data.size(),
&ZOO_OPEN_ACL_UNSAFE, createMode.value(),path_buff,256);
if (rc!=ZOK){
LOGe("Failed to create znode %s",path.c_str());
THROW(zerror(rc));
}
LOGi("Succeeded in create zode %s",path.c_str());
return string(path_buff);
}
void zk_delete(string const& path,int version){
int rc=zoo_delete(zk_handle,path.c_str(),version);
if (rc!=ZOK){
LOGe("Failed to delete znode %s",path.c_str());
THROW(zerror(rc));
}
LOGi("Succeeded in delete znode %s",path.c_str());
}
bool zk_exists(string const& path, bool watch){
int rc=zoo_exists(zk_handle, path.c_str(), watch, NULL);
if (rc==ZOK){
LOGi("Znode %s already exists", path.c_str());
return true;
}else if (rc==ZNONODE){
LOGi("Znode %s no exists yet", path.c_str());
return false;
}else {
LOGe("Failed to invoke zk_exists");
THROW(zerror(rc));
return false;
}
}
string zk_getData(string const& path, bool watch){
char buff[256]={0};
int bufflen=256;
int rc=zoo_get(zk_handle,path.c_str(), watch, buff, &bufflen, NULL);
if (rc!=ZOK){
LOGe("Failed to get data of znode %s", path.c_str());
THROW(zerror(rc));
}
LOGi("Succeeded in geting data of znode %s", path.c_str());
string s(bufflen,'0');
std::copy(buff,buff+bufflen,s.begin());
}
void zk_setData(string const& path, string const& data, int version){
int rc=zoo_set(zk_handle,path.c_str(),data.c_str(),
data.size(),version);
if (rc!=ZOK){
LOGe("Failed to set data of znode %s",path.c_str());
THROW(zerror(rc));
}
LOGi("Succeeded in setting data of znode %s",path.c_str());
}
list<string> zk_getChildren(string const& path,bool watch){
//struct String_vector strings;
StringVectorWrapper svw;
//int rc=zoo_get_children(zk_handle,path.c_str(),watch,&strings);
int rc=zoo_get_children(zk_handle,path.c_str(),watch,svw);
if (rc!=ZOK) {
LOGe("Failed to get children of znode %s",path.c_str());
THROW(zerror(rc));
}
return svw();
//list<string> children;
//for (int i=0;i<strings.count;++i){
// children.push_back(strings.data[i]);
//}
//deallocate_String_vector(&strings);
//return children;
}
};
static void watch_process(zhandle_t* zh,int type,int state,
const char* path,void* zookeeper){
LOGi("Event triggered");
ZooKeeper& zk = *(ZooKeeper*)zookeeper;
WatchedEvent event(path,state,type);
zk.watch_callback(event);
}
ZOOKEEPER_NAMESPACE_END
#endif