forked from hstern/pynmsg
-
Notifications
You must be signed in to change notification settings - Fork 1
/
nmsg_output.pyx
125 lines (98 loc) · 3.79 KB
/
nmsg_output.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def output_open_file(obj, size_t bufsz=NMSG_WBUFSZ_MAX):
if type(obj) == str:
obj = open(obj, 'w')
o = output()
o._open_file(obj, bufsz)
o.fileobj = obj
return o
def output_open_sock(addr, port, size_t bufsz=NMSG_WBUFSZ_ETHER, broadcast=False):
obj = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
obj.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if broadcast:
obj.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
obj.connect((addr, port))
o = output()
o._open_sock(obj, bufsz)
o.fileobj = obj
return o
def output_open_callback(func):
o = output()
o._open_callback(func)
o.func = func
return o
cdef void callback(nmsg_message_t _msg, void *user) with gil:
cdef _recv_message msg
msg = _recv_message()
msg.set_instance(_msg)
(<object>user)(msg)
cdef class output(object):
cdef nmsg_output_t _instance
cdef public object fileobj
cdef public object func
cdef str output_type
open_file = staticmethod(output_open_file)
open_sock = staticmethod(output_open_sock)
open_callback = staticmethod(output_open_callback)
def __cinit__(self):
self._instance = NULL
def __dealloc__(self):
if self._instance != NULL:
nmsg_output_close(&self._instance)
def __repr__(self):
return 'nmsg output object type=%s _instance=0x%x' % (self.output_type, <uint64_t> self._instance)
cpdef _open_file(self, fileobj, size_t bufsz):
self._instance = nmsg_output_open_file(fileobj.fileno(), bufsz)
if self._instance == NULL:
raise Exception, 'nmsg_output_open_file() failed'
self.output_type = 'file'
cpdef _open_sock(self, fileobj, size_t bufsz):
self._instance = nmsg_output_open_sock(fileobj.fileno(), bufsz)
if self._instance == NULL:
raise Exception, 'nmsg_output_open_sock() failed'
self.output_type = 'socket'
cpdef _open_callback(self, object func):
self._instance = nmsg_output_open_callback(<nmsg_cb_message>callback, <void*>func)
if self._instance == NULL:
raise Exception, 'nmsg_output_open_callback() failed'
self.output_type = 'callback'
def set_filter_msgtype(self, vid, msgtype):
if self._instance == NULL:
raise Exception, 'object not initialized'
if type(vid) == str:
vid = msgmod_vname_to_vid(vid)
if type(msgtype) == str:
msgtype = msgmod_mname_to_msgtype(vid, msgtype)
nmsg_output_set_filter_msgtype(self._instance, vid, msgtype)
def close(self):
nmsg_output_close(&self._instance)
self._instance = NULL
def set_buffered(self, bool buffered):
if self._instance != NULL:
nmsg_output_set_buffered(self._instance, buffered)
def set_zlibout(self, bool zlibout):
if self._instance != NULL:
nmsg_output_set_zlibout(self._instance, zlibout)
def flush(self):
cdef nmsg_res res
res = nmsg_output_flush(self._instance)
if res != nmsg_res_success:
raise Exception, 'nmsg_output_flush() failed'
def write(self, message msg):
cdef nmsg_res res
cdef nmsg_message_t _msg_instance
if self._instance == NULL:
raise Exception, 'object not initialized'
if not msg:
return
if msg._instance == NULL:
msg.reinit()
if msg.changed:
msg.sync_message()
msg.sync_fields()
_msg_instance = msg._instance
msg._instance = NULL
res = nmsg_output_write(self._instance, _msg_instance)
if res != nmsg_res_success:
nmsg_message_destroy(&_msg_instance)
raise Exception, 'nmsg_output_write() failed'
nmsg_message_destroy(&_msg_instance)