#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
using namespace std;
// 实现自己的线程池,可设置线程数目、任务优先级(优先级从0到9,数字越高优先级越高),可正常关闭
class ThreadPool {
public:
ThreadPool(int size) {
for (int i = 0; i < size; i++) {
AddThread();
}
}
~ThreadPool() {
{
unique_lock<mutex> lck(mtx_);
is_stop_ = true;
}
cv_.notify_all();
for (auto& tid : pool_)
if (tid.joinable()) tid.join();
}
void AddThread() {
pool_.emplace_back([this]() {
while (true) {
Task task;
{
unique_lock<mutex> lck(mtx_);
cv_.wait(lck, [this]() { return !task_.empty() || is_stop_; });
if (is_stop_ && task_.empty()) return;
task = move(task_.top());
task_.pop();
}
task.func();
}
});
}
template <typename F, typename... Args>
void PutTask(int priority, F&& f, Args&&... args) {
auto func = bind(forward<F>(f), forward<Args>(args)...);
{
unique_lock<mutex> lck(mtx_);
task_.emplace(priority, func);
}
cv_.notify_one();
}
private:
struct Task {
int priority;
function<void()> func;
Task() = default;
Task(int p, function<void()> f) : priority(p), func(move(f)) {}
bool operator<(const Task& other) const {
return priority < other.priority;
}
};
priority_queue<Task> task_;
vector<thread> pool_;
int size_{0};
bool is_stop_{false};
condition_variable cv_;
mutex mtx_;
};
void ThreadPoolTest() {
cout << "------------ThreadPoolTest Start------------" << endl;
ThreadPool tp(2);
for (int i = 0; i < 10; i++) {
tp.PutTask(
i,
[](int num) {
this_thread::sleep_for(chrono::milliseconds(100));
cout << "Tid: " << this_thread::get_id() << " Priority: " << num
<< endl;
},
i);
}
cout << "------------ThreadPoolTest End------------" << endl;
}