forked from iamscottxu/obs-rtspserver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadsafe_queue.h
126 lines (111 loc) · 2.65 KB
/
threadsafe_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#ifndef __THREADSAFE_QUEUE_H_
#define __THREADSAFE_QUEUE_H_
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
using namespace std;
template<typename T> class threadsafe_queue
{
public:
threadsafe_queue()
: m_bTermination(false) {
}
~threadsafe_queue() {}
//(1)没有调用termination时,每调用一次出队一个元素,直到队列为空本方法阻塞线程。
//(2)在调用了termination后,本方法永不阻塞,如果原本已经处于阻塞状态,接触阻塞状态。
//(3)返回true时,value值有效。返回false时,value值无效。调用了termination且队列为空时返回false.
bool wait_and_pop(T &value)
{
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {
return ((!data_queue.empty()) || m_bTermination.load(memory_order_acquire));
});
//不为空则出队
if (!data_queue.empty())
{
value = move(*data_queue.front());
data_queue.pop();
return true;
}
//队列为空则返回失败
return false;
}
//队列为空返回false
bool try_pop(T &value)
{
lock_guard<mutex> lk(mut);
if (data_queue.empty())
{
return false;
}
value = move(*data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr<T> wait_and_pop()
{
unique_lock<mutex> lk(mut);
data_cond.wait(lk, [this] {
return ((!data_queue.empty()) || m_bTermination.load(memory_order_acquire));
});
if (!data_queue.empty())
{
shared_ptr<T> res = data_queue.front();
data_queue.pop();
return res;
}
return nullptr;
}
//队列为空返回null
std::shared_ptr<T> try_pop()
{
unique_lock<mutex> lk(mut);
if (data_queue.empty())
{
return nullptr;
}
shared_ptr<T> res = data_queue.front();
data_queue.pop();
return res;
}
//插入一项
void push(T new_value)
{
if (m_bTermination.load(memory_order_acquire))
return;
shared_ptr<T> data(make_shared<T>(move(new_value)));
unique_lock<mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
bool empty()
{
unique_lock<mutex> lk(mut);
return data_queue.empty();
}
int size()
{
unique_lock<mutex> lk(mut);
return data_queue.size();
}
//设置队列为退出状态。在退出状态下,忽略入队,可以执行出队,但当队列为空时,wait_and_pop不会阻塞。
void termination()
{
unique_lock<mutex> lk(mut);
m_bTermination.store(true, memory_order_release);
data_cond.notify_all();
}
//是退出状态吗
bool is_termination() const
{
return m_bTermination.load(memory_order_acquire);
}
private:
mutex mut;
queue<shared_ptr<T>> data_queue;
condition_variable data_cond;
atomic_bool m_bTermination;
};
#endif