-
Notifications
You must be signed in to change notification settings - Fork 65
/
message_queue.h
162 lines (149 loc) · 5.55 KB
/
message_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/*
* Copyright (c) 2012 Jeremy Pepper
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of message_queue nor the names of its contributors may
* be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef MESSAGE_QUEUE_H
#define MESSAGE_QUEUE_H
#ifndef CACHE_LINE_SIZE
#define CACHE_LINE_SIZE 64
#endif
#include <semaphore.h>
/**
* \brief Message queue structure
*
* This structure is passed to all message_queue API calls
*/
struct message_queue {
unsigned int message_size;
unsigned int max_depth;
void *memory;
void **freelist;
void **queue_data;
struct {
sem_t *sem;
unsigned int blocked_readers;
int free_blocks;
unsigned int allocpos __attribute__((aligned(CACHE_LINE_SIZE)));
unsigned int freepos __attribute__((aligned(CACHE_LINE_SIZE)));
} allocator __attribute__((aligned(CACHE_LINE_SIZE)));
struct {
sem_t *sem;
unsigned int blocked_readers;
int entries;
unsigned int readpos __attribute__((aligned(CACHE_LINE_SIZE)));
unsigned int writepos __attribute__((aligned(CACHE_LINE_SIZE)));
} queue __attribute__((aligned(CACHE_LINE_SIZE)));
};
/**
* \brief Initialize a message queue structure
*
* This function must be called before any other message_queue API calls on a
* message queue structure.
*
* \param queue pointer to the message queue structure to initialize
* \param message_size size in bytes of the largest message that will be sent
* on this queue
* \param max_depth the maximum number of message to allow in the queue at
* once. This will be rounded to the next highest power of two.
*
* \return 0 if successful, or nonzero if an error occured
*/
int message_queue_init(struct message_queue *queue, int message_size, int max_depth);
/**
* \brief Allocate a new message
*
* This allocates message_size bytes to be used with this queue. Messages
* passed to the queue MUST be allocated with this function or with
* message_queue_message_alloc_blocking.
*
* \param queue pointer to the message queue to which the message will be
* written
* \return pointer to the allocated message, or NULL if no memory is available
*/
void *message_queue_message_alloc(struct message_queue *queue);
/**
* \brief Allocate a new message
*
* This allocates message_size bytes to be used with this queue. Messages
* passed to the queue MUST be allocated with this function or with
* message_queue_message_alloc. This function blocks until memory is
* available.
*
* \param queue pointer to the message queue to which the message will be
* written
* \return pointer to the allocated message
*/
void *message_queue_message_alloc_blocking(struct message_queue *queue);
/**
* \brief Free a message
*
* This returns the message to the queue's freelist to be reused to satisfy
* future allocations. This function MUST be used to free messages--they
* cannot be passed to free().
*
* \param queue pointer to the message queue from which the message was
* allocated
* \param message pointer to the message to be freed
*/
void message_queue_message_free(struct message_queue *queue, void *message);
/**
* \brief Write a message to the queue
*
* Messages must have been allocated from the same queue by
* message_queue_message_alloc to be passed to this function.
*
* \param queue pointer to the queue to which to write
* \param message pointer to the message to write to the queue
*/
void message_queue_write(struct message_queue *queue, void *message);
/**
* \brief Read a message from the queue if one is available
*
* \param queue pointer to the queue from which to read
* \return pointer to the next message on the queue, or NULL if no messages
* are available.
*/
void *message_queue_tryread(struct message_queue *queue);
/**
* \brief Read a message from the queue
*
* This reads a message from the queue, blocking if necessary until one is
* available.
*
* \param queue pointer to the queue from which to read
* \return pointer to the next message on the queue
*/
void *message_queue_read(struct message_queue *queue);
/**
* \brief Destroy a message queue structure
*
* This frees any resources associated with the message queue.
*
* \param queue pointer to the message queue to destroy
*/
void message_queue_destroy(struct message_queue *queue);
#endif