forked from repu1sion/tracepulse
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tracepulse.c
325 lines (266 loc) · 8.94 KB
/
tracepulse.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
//gcc tracepulse.c -g -o tracepulse -ltrace -I/root/libtrace/
//gcc tracepulse.c -o tracepulse -ltrace && ./tracepulse
//gcc tracepulse.c -o tracepulse -ltrace -I/mnt/raw/gdwk/libtrace/ && sudo ./tracepulse 4
//sudo ./tracepulse 4 ring:eth0 erf:1.erf
//tracepulse 4 odp:"01:00.1" erf:trace.erf.gz
//combiner: we use combiner_ordered. so output data stored in ordered way.
// there are 3 combiner types: ordered, unordered, sorted.
//hasher: 3 hashers: balanced, unidirectional, bidirectional, we use balanced one,
// so the data spread packets across the threads in a balanced way
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <libtrace_parallel.h> //resides just in /usr/local/include
//#include "lib/libtrace_int.h" //present only in libtrace sources
#include "config.h" //required by libtrace_int.h
#include <signal.h>
//#define O_FILENAME "erf:pkts.erf"
//#define DEBUG
#ifdef DEBUG
#define debug(x...) printf(x)
#else
#define debug(x...)
#endif
typedef struct libtrace_thread_t libtrace_thread_t;
//local storage for each processing thread. should be allocated for every thread
struct t_store
{
uint64_t pkts; //received packets
uint64_t bytes; //received bytes
};
//storage for reporter thread (the only one)
struct r_store
{
uint64_t pkts; //received packets
uint64_t bytes; //received bytes
libtrace_out_t *output; //output descriptor
};
struct sigaction sigact;
char in_uri[512] = {0};
char out_uri[512] = {0};
static int compress_level = -1;
static trace_option_compresstype_t compress_type = TRACE_OPTION_COMPRESSTYPE_NONE;
// PROCESSING THREADS CALLBACKS
// -----------------------------------------------------------------------------
//start callback function
static void* start_cb(libtrace_t *trace, libtrace_thread_t *thread, void *global)
{
/* Create and initialise a counter struct */
struct t_store *ts = (struct t_store*)malloc(sizeof(struct t_store));
if (!ts)
{
printf("<error: can't allocate ram for thread storage!>\n");
return NULL;
}
memset(ts, 0x0, sizeof(struct t_store));
return ts;
}
static void stop_cb(libtrace_t *trace, libtrace_thread_t *thread, void *global, void *tls)
{
struct t_store *ts = (struct t_store*)tls;
libtrace_generic_t gen;
gen.ptr = ts;
//XXX: 0 - order for result, 0 - no order, but we need to add ordering by timestamp!
//could be needed RESULT_PACKET
//Inside it calls:
//libtrace->combiner.publish(libtrace, t->perpkt_num, &libtrace->combiner, &res);
//trace_publish_result(trace, thread, 0, gen, RESULT_USER);
}
// the packet callback
static libtrace_packet_t* packet_cb(libtrace_t *trace, libtrace_thread_t *thread,
void *global, void *tls, libtrace_packet_t *packet)
{
int payloadlen = 0;
struct t_store *ts = (struct t_store*)tls;
int thread_num = trace_get_perpkt_thread_id(thread);
payloadlen = trace_get_payload_length(packet);
ts->pkts++;
ts->bytes += payloadlen;
debug("thread #%d: len: %d, pkts: %lu, bytes: %lu \n", thread_num, payloadlen, ts->pkts, ts->bytes);
// forwarding the packet to the reporter
trace_publish_result(trace, thread, 0, (libtrace_generic_t){.pkt = packet}, RESULT_PACKET);
//by returning NULL we say to libtrace that we are keeping the packet
return NULL;
}
// -----------------------------------------------------------------------------
// REPORTER THREAD CALLBACKS
// -----------------------------------------------------------------------------
/* Starting callback for the reporter thread */
static void *start_reporter_cb(libtrace_t *trace, libtrace_thread_t *thread, void *global)
{
debug("%s(): enter\n", __func__);
//char uri[512] = {0};
struct r_store *rs = (struct r_store*)malloc(sizeof(struct r_store));
if (!rs)
{
printf("<error: can't allocate ram for thread storage!>\n");
return NULL;
}
memset(rs, 0x0, sizeof(struct r_store));
//create output --------------------
//strcpy(uri, O_FILENAME);
rs->output = trace_create_output(out_uri);
if (trace_is_err_output(rs->output))
{
trace_perror_output(rs->output, "%s", out_uri);
return NULL;
}
if (compress_level != -1)
{
if (trace_config_output(rs->output, TRACE_OPTION_OUTPUT_COMPRESS,
&compress_level)==-1)
{
trace_perror_output(rs->output, "Unable to set compression level");
}
}
if (trace_config_output(rs->output, TRACE_OPTION_OUTPUT_COMPRESSTYPE,
&compress_type) == -1)
{
trace_perror_output(rs->output, "Unable to set compression type");
}
trace_start_output(rs->output);
if (trace_is_err_output(rs->output))
{
trace_perror_output(rs->output, "%s", out_uri);
return NULL;
}
debug("%s(): exit\n", __func__);
return rs;
}
// The result callback is invoked for each result that reaches the reporter thread
// (so anytime when someone calls trace_publish_result())
static void result_reporter_cb(libtrace_t *trace, libtrace_thread_t *sender,
void *global, void *tls, libtrace_result_t *result)
{
struct r_store *rs = (struct r_store*)tls;
libtrace_packet_t *pkt;
int payloadlen = 0;
debug("%s()\n", __func__);
pkt = (libtrace_packet_t *)result->value.pkt;
if (pkt)
{
payloadlen = trace_get_payload_length(pkt);
rs->pkts++;
rs->bytes += payloadlen;
debug("pkt in reporter from t #: %d, len: %d, total pkts: %lu, total bytes: %lu \n",
trace_get_perpkt_thread_id(sender), payloadlen, rs->pkts, rs->bytes);
//writing to file
if (result->type == RESULT_PACKET)
{
/* Write the packet to disk */
trace_write_packet(rs->output, pkt);
trace_free_packet(trace, pkt);
}
}
}
//called once in the end for reporter thread?
static void stop_reporter_cb(libtrace_t *trace, libtrace_thread_t *thread,
void *global, void *tls)
{
struct r_store *rs = (struct r_store*)tls;
debug("%s()\n", __func__);
trace_destroy_output(rs->output);
}
// -----------------------------------------------------------------------------
int init()
{
printf("init\n");
return 0;
}
int scrot()
{
int rv = 0;
char cmd[512] = {0};
/*
strcpy(cmd, "scrot ");
strcat(cmd, IMG_NAME);
rv = system(cmd);
printf("scrot execution value is: %d\n", rv);
*/
return rv;
}
//add this to Ctrl-C signal processing
void sigterminating(void *arg)
{
libtrace_t *input = (libtrace_t*)arg;
trace_pstop(input);
}
static void signal_handler(int sig)
{
if (sig == SIGUSR1)
printf("Caught signal SIGUSR1 !\n");
else if (sig == SIGUSR2)
printf("Caught signal SIGUSR2 !\n");
}
int main(int argc, char *argv[])
{
int rv = 0;
int threads_num = 1; //1 thread by default
libtrace_t *input;
char *def_uri = "ring:eth0"; //default uri
libtrace_callback_set_t *processing = NULL, *reporter = NULL;
//rv = init();
if (argc != 4)
{
printf("syntax is: num_treads INPUT OUTPUT\n");
exit(1);
}
else
{
threads_num = atoi(argv[1]);
strcpy(in_uri, argv[2]);
strcpy(out_uri, argv[3]);
}
//signal handling
sigact.sa_handler = signal_handler;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = 0;
sigaction(SIGUSR1, &sigact, (struct sigaction *)NULL);
//we create 2 callback sets: for processing and reporter threads
processing = trace_create_callback_set();
trace_set_starting_cb(processing, start_cb);
trace_set_stopping_cb(processing, stop_cb);
trace_set_packet_cb(processing, packet_cb);
reporter = trace_create_callback_set();
trace_set_starting_cb(reporter, start_reporter_cb);
trace_set_stopping_cb(reporter, stop_reporter_cb);
trace_set_result_cb(reporter, result_reporter_cb);
/* Create the input trace object */
input = trace_create(in_uri);
if (trace_is_err(input))
{
trace_perror(input, "error creating trace");
return 1;
}
/* Set the number of processing threads to use.
If not set, libtrace will create one thread for each core it detects on your system. */
printf("set %d threads \n", threads_num);
trace_set_perpkt_threads(input, threads_num);
/* Send every result to the reporter immediately, i.e. do not buffer them. */
//trace_set_reporter_thold(input, 1);
//there are 3 possible combiners: ordered, unordered, sorted. we use ordered.
trace_set_combiner(input, &combiner_unordered, (libtrace_generic_t){0}); //XXX - strange syntax
/* Try to balance our load across all processing threads. If
we were doing flow analysis, we should use
HASHER_BIDIRECTIONAL instead to ensure that all packets for
a given flow end up on the same processing thread. */
trace_set_hasher(input, HASHER_BALANCE, NULL, NULL);
/* Start the parallel trace using our callback sets. The NULL
* parameter here is where we can provide global data for the
input trace -- we don't need any in this example.
Second param is global data available for all callbacks
Third param - callback set for processing threads
Fourth param - callback set for reporter thread */
if (trace_pstart(input, NULL, processing, reporter))
{
trace_perror(input, "Starting parallel trace");
return 1;
}
/* This will wait for all the threads to complete */
trace_join(input);
/* Clean up everything that we've created */
trace_destroy(input);
trace_destroy_callback_set(processing);
trace_destroy_callback_set(reporter);
return rv;
}