-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathodp_test.c
371 lines (317 loc) · 11.8 KB
/
odp_test.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <odp_api.h>
#include <odp/helper/linux.h>
#define UNUSE(x) (x = x)
#define MAX_WORKERS 8
#define MAX_TIMEOUT_WORKERS 4
#define NUM_TMOS 10000
#define WAIT_NUM 10 /**< Max tries to rx last tmo per worker */
/** Get rid of path in filename - only for unix-type paths using '/' */
#define NO_PATH(file_name) (strrchr((file_name), '/') ? \
strrchr((file_name), '/') + 1 : (file_name))
struct test_timer {
odp_timer_t tim;
odp_event_t ev;
};
typedef struct {
int cpu_count;
int timeout_worker_count;
} appl_args_t;
typedef struct {
odp_queue_t queue;
} thread_args_t;
typedef struct {
appl_args_t appl;
thread_args_t thread[MAX_WORKERS];
odp_barrier_t barrier; /*< Barrier for test synchronisation*/
odp_timer_pool_t tp; /*< Timer pool handle*/
odp_pool_t tmop; /*< Timeout pool handle*/
odp_atomic_u32_t remain; /*< Number of timeouts to receive*/
struct test_timer tt[256]; /*< Array of all timer helper structs*/
uint32_t num_workers; /**< Number of threads */
odp_schedule_group_t schedule_group_timer;
} args_t;
/** Global pointer to args */
static args_t *gbl_args;
/**
* Prinf usage information
*/
static void usage(char *progname)
{
printf("\nOpenDataPlane L2 forwarding application.\n"
"\nUsage: %s OPTIONS\n"
" -c <number> CPU count. %s will create worker thread on each CPU core.\n"
" -g <number> Worker thread number in timer schedule group. this should less than CPU count.\n"
" -h, --help Display help and exit.\n\n",
NO_PATH(progname), NO_PATH(progname)
);
}
static void gbl_args_init(args_t *args)
{
memset(args, 0, sizeof(args_t));
args->tmop = ODP_POOL_INVALID;
args->tp = ODP_TIMER_POOL_INVALID;
}
/**
* Parse and store the command line arguments
*
* @param argc argument count
* @param argv[] argument vector
* @param appl_args Store application arguments here
*/
static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
{
int opt;
static const char *shortopts = "c:g:h";
while (1) {
opt = getopt(argc, argv, shortopts);
if (opt == -1)
break; /* No more options */
switch (opt) {
case 'c':
appl_args->cpu_count = atoi(optarg);
break;
case 'g':
appl_args->timeout_worker_count = atoi(optarg);
break;
case 'h':
usage(argv[0]);
exit(EXIT_SUCCESS);
break;
default:
break;
}
}
}
static int worker_thread(void *arg)
{
odp_queue_t queue;
struct test_timer *ttp;
odp_timeout_t tmo;
uint64_t tick;
uint64_t period;
uint64_t period_ns;
thread_args_t * thr_args = arg;
odp_thrmask_t mask;
uint32_t num_workers = gbl_args->num_workers;
int thr = odp_thread_id();
int cpu = odp_cpu_id();
if (thr <= gbl_args->appl.timeout_worker_count ) {
odp_thrmask_zero(&mask);
odp_thrmask_set(&mask, thr);
if (odp_schedule_group_join(gbl_args->schedule_group_timer, &mask)) {
printf("Join failed\n");
return -1;
}
}
//queue = odp_queue_lookup("timer_queue");
queue = thr_args->queue;
period_ns = 1000000 * ODP_TIME_USEC_IN_NS;
period = odp_timer_ns_to_tick(gbl_args->tp, period_ns);
ttp = &gbl_args->tt[thr];
ttp->tim = odp_timer_alloc(gbl_args->tp, queue, ttp);
if (ttp->tim == ODP_TIMER_INVALID) {
printf("Failed to allocate timer\n");
return -1;
}
tmo = odp_timeout_alloc(gbl_args->tmop);
if (tmo == ODP_TIMEOUT_INVALID) {
printf("Failed to allocate timeout\n");
return -1;
}
ttp->ev = odp_timeout_to_event(tmo);
tick = odp_timer_current_tick(gbl_args->tp);
while(1)
{
int wait = 0;
odp_event_t ev;
odp_timer_set_t rc;
if (ttp) {
tick += period;
rc = odp_timer_set_abs(ttp->tim, tick, &ttp->ev);
if (odp_unlikely(rc != ODP_TIMER_SUCCESS)) {
printf("odp_timer_set_abs failed.\n");
return -1;
}
}
/* Get the next expired timeout.
* We invoke the scheduler in a loop with a timeout because
* we are not guaranteed to receive any more timeouts. The
* scheduler isn't guaranteeing fairness when scheduling
* buffers to threads.
* Use 1.5 second timeout for scheduler */
uint64_t sched_tmo =
odp_schedule_wait_time(1500000000ULL);
do {
ev = odp_schedule(&queue, sched_tmo);
/* Check if odp_schedule() timed out, possibly there
* * are no remaining timeouts to receive */
if (++wait > WAIT_NUM &&
odp_atomic_load_u32(&gbl_args->remain) < num_workers)
printf("At least one TMO was lost\n");
} while (ev == ODP_EVENT_INVALID &&
(int)odp_atomic_load_u32(&gbl_args->remain) > 0);
if (ev == ODP_EVENT_INVALID)
break; /* No more timeouts */
if (odp_event_type(ev) != ODP_EVENT_TIMEOUT) {
/* Not a default timeout event */
printf("Unexpected event type (%u) received\n",
odp_event_type(ev));
}
odp_timeout_t tmo = odp_timeout_from_event(ev);
tick = odp_timeout_tick(tmo);
ttp = odp_timeout_user_ptr(tmo);
ttp->ev = ev;
if (!odp_timeout_fresh(tmo)) {
/* Not the expected expiration tick, timer has
* * been reset or cancelled or freed */
printf("Unexpected timeout received (timer %lu, tick %lu)\n", odp_timer_to_u64(ttp->tim), tick);
}
printf(" [CPU %i, thread %i] timeout, tick %lu\n", cpu, thr, tick);
uint32_t rx_num = odp_atomic_fetch_dec_u32(&gbl_args->remain);
if (!rx_num)
printf("Unexpected timeout received (timer %lu, tick %lu)\n", odp_timer_to_u64(ttp->tim), tick);
else if (rx_num > num_workers)
continue;
odp_event_free(ttp->ev);
odp_timer_free(ttp->tim);
ttp = NULL;
}
return 0;
}
int main(int argc, char *argv[])
{
int i;
odp_instance_t instance;
int num_workers;
odp_cpumask_t cpumask;
char cpumaskstr[ODP_CPUMASK_STR_SIZE];
odph_odpthread_t thread_tbl[MAX_WORKERS];
int cpu;
odp_timer_pool_param_t tparams;
odp_pool_param_t params;
odp_queue_param_t param;
odp_shm_t shm = ODP_SHM_INVALID;
odp_thrmask_t zero;
UNUSE(argc);
UNUSE(argv);
/* Init ODP before calling anything else */
if (odp_init_global(&instance, NULL, NULL)) {
printf("Error: ODP global init failed.\n");
exit(EXIT_FAILURE);
}
if (odp_init_local(instance, ODP_THREAD_CONTROL)) {
printf("Error: ODP local init failed.\n");
exit(EXIT_FAILURE);
}
/* Reserve memory for args from shared mem */
shm = odp_shm_reserve("shm_args", sizeof(args_t),
ODP_CACHE_LINE_SIZE, 0);
gbl_args = odp_shm_addr(shm);
if (gbl_args == NULL) {
printf("Error: shared mem alloc failed.\n");
exit(EXIT_FAILURE);
}
/* Init global arguments */
gbl_args_init(gbl_args);
/* Parse and store the application arguments */
parse_args(argc, argv, &gbl_args->appl);
/* Default to system CPU count unless user specified */
num_workers = MAX_WORKERS;
if (gbl_args->appl.cpu_count) {
num_workers = gbl_args->appl.cpu_count;
}
if (gbl_args->appl.timeout_worker_count) {
if (gbl_args->appl.timeout_worker_count > gbl_args->appl.cpu_count) {
gbl_args->appl.timeout_worker_count = gbl_args->appl.cpu_count;
}
} else {
gbl_args->appl.timeout_worker_count = MAX_TIMEOUT_WORKERS;
}
/* Get default worker cpumask */
num_workers = odp_cpumask_default_worker(&cpumask, num_workers);
(void)odp_cpumask_to_str(&cpumask, cpumaskstr, sizeof(cpumaskstr));
printf("num worker threads: %i\n", num_workers);
printf("num timeout worker threads: %i\n", gbl_args->appl.timeout_worker_count);
printf("first CPU: %i\n", odp_cpumask_first(&cpumask));
printf("cpu mask: %s\n", cpumaskstr);
gbl_args->num_workers = num_workers;
/* Initialize number of timeouts to receive */
odp_atomic_init_u32(&gbl_args->remain, NUM_TMOS * num_workers);
odp_barrier_init(&gbl_args->barrier, num_workers);
/* Create timeout pool */
memset(¶ms, 0, sizeof(params));
params.tmo.num = NUM_TMOS;
params.type = ODP_POOL_TIMEOUT;
gbl_args->tmop = odp_pool_create("timeout_pool", ¶ms);
if (gbl_args->tmop == ODP_POOL_INVALID) {
printf("Error: timeout pool create failed.\n");
exit(EXIT_FAILURE);
}
/* Create timer pool */
tparams.res_ns = 1 * ODP_TIME_MSEC_IN_NS;
tparams.min_tmo = 1 * ODP_TIME_MSEC_IN_NS;
tparams.max_tmo = 1000 * ODP_TIME_SEC_IN_NS;
tparams.num_timers = num_workers; /* One timer per worker */
tparams.priv = 0; /* Shared */
tparams.clk_src = ODP_CLOCK_CPU;
gbl_args->tp = odp_timer_pool_create("timer_pool", &tparams);
if (gbl_args->tp == ODP_TIMER_POOL_INVALID) {
printf("Timer pool create failed.\n");
exit(EXIT_FAILURE);
}
odp_timer_pool_start();
odp_thrmask_zero(&zero);
gbl_args->schedule_group_timer = odp_schedule_group_create(
"sche_group_timer", &zero);
if (gbl_args->schedule_group_timer == ODP_SCHED_GROUP_INVALID) {
printf("Group create failed\n");
exit(EXIT_FAILURE);
}
for (i=0; i<num_workers; i++) {
/* Create queue for each thread */
odp_queue_param_init(¶m);
param.type = ODP_QUEUE_TYPE_SCHED;
param.sched.prio = ODP_SCHED_PRIO_DEFAULT;
param.sched.sync = ODP_SCHED_SYNC_ORDERED;
//param.sched.group = ODP_SCHED_GROUP_ALL;
param.sched.group = gbl_args->schedule_group_timer;
gbl_args->thread[i].queue = odp_queue_create("timer_queue", ¶m);
if (gbl_args->thread[i].queue == ODP_QUEUE_INVALID) {
printf("Error: Timer queue for thread %i create failed.\n", i);
exit(EXIT_FAILURE);
}
}
/* Create worker threads */
cpu = odp_cpumask_first(&cpumask);
for (i = 0; i < num_workers; ++i) {
odp_cpumask_t thd_mask;
odph_odpthread_params_t thr_params;
memset(&thr_params, 0, sizeof(thr_params));
thr_params.start = worker_thread;
thr_params.arg = &gbl_args->thread[i];
thr_params.thr_type = ODP_THREAD_WORKER;
thr_params.instance = instance;
odp_cpumask_zero(&thd_mask);
odp_cpumask_set(&thd_mask, cpu);
odph_odpthreads_create(&thread_tbl[i], &thd_mask,
&thr_params);
cpu = odp_cpumask_next(&cpumask, cpu);
}
/* Master thread waits for other threads to exit */
for (i = 0; i < num_workers; ++i)
odph_odpthreads_join(&thread_tbl[i]);
if (odp_term_local()) {
printf("Error: term local\n");
exit(EXIT_FAILURE);
}
if (odp_term_global(instance)) {
printf("Error: term global\n");
exit(EXIT_FAILURE);
}
printf("Exit!\n\n");
return 0;
}