Skip to content

Commit

Permalink
added pool instant destruction flag and feedback methods
Browse files Browse the repository at this point in the history
Destroying without waiting may be wished too.
Feedback about workers number and queue size allows more user control like avoiding out-of-memory or creating tasks more fitting to the number of concurrent executions.
  • Loading branch information
Youka committed Jul 10, 2015
1 parent 51737c3 commit b2ac0d7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
26 changes: 20 additions & 6 deletions ThreadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ freely, subject to the following restrictions:
class ThreadPool {
public:
// the constructor just launches some amount of workers
ThreadPool(size_t threads_n = std::thread::hardware_concurrency()) : stop(false)
ThreadPool(size_t threads_n = std::thread::hardware_concurrency(), const bool terminate = false) : stop(false), terminate(terminate)
{
if(!threads_n)
throw std::invalid_argument("more than zero threads expected");
Expand Down Expand Up @@ -77,6 +77,15 @@ class ThreadPool {
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
// number of workers
size_t size() const{
return this->workers.size();
}
// current number of tasks in queue
size_t pending(){
std::unique_lock<std::mutex> lock(this->queue_mutex);
return this->tasks.size();
}
// add new work item to the pool
template<class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> enqueue(F&& f, Args&&... args)
Expand All @@ -94,13 +103,15 @@ class ThreadPool {
this->condition.notify_one();
return res;
}
// the destructor joins all threads
// the destructor joins or terminates all threads
virtual ~ThreadPool()
{
this->stop = true;
this->condition.notify_all();
for(std::thread& worker : this->workers)
worker.join();
if(!this->terminate){
this->stop = true;
this->condition.notify_all();
for(std::thread& worker : this->workers)
worker.join();
}
}
private:
// need to keep track of threads so we can join them
Expand All @@ -113,6 +124,9 @@ class ThreadPool {
std::condition_variable condition;
// workers finalization flag
std::atomic_bool stop;

// immediate thread termination flag
const bool terminate;
};

#endif // THREAD_POOL_HPP
9 changes: 8 additions & 1 deletion example.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
#include <iostream>
#include <vector>
#include <chrono>
#include <sstream>

#include "ThreadPool.hpp"

int main()
{

ThreadPool pool(4);
ThreadPool pool;
std::vector< std::future<int> > results;
std::ostringstream buf;

std::cout << "Workers: " << pool.size() << std::endl;

for(int i = 0; i < 8; ++i) {
results.emplace_back(
Expand All @@ -21,6 +25,9 @@ int main()
);
}

buf << '<' << pool.pending() << '>';
std::cout << buf.str();

for(auto && result: results)
std::cout << result.get() << ' ';
std::cout << std::endl;
Expand Down

0 comments on commit b2ac0d7

Please sign in to comment.