diff --git a/src/Common/AsyncLoader.cpp b/src/Common/AsyncLoader.cpp index cfb273b9058b..6264eb031067 100644 --- a/src/Common/AsyncLoader.cpp +++ b/src/Common/AsyncLoader.cpp @@ -49,6 +49,7 @@ void logAboutProgress(LoggerPtr log, size_t processed, size_t total, AtomicStopw AsyncLoader::Pool::Pool(const AsyncLoader::PoolInitializer & init) : name(init.name) , priority(init.priority) + , max_threads(init.max_threads > 0 ? init.max_threads : getNumberOfPhysicalCPUCores()) , thread_pool(std::make_unique( init.metric_threads, init.metric_active_threads, @@ -56,17 +57,16 @@ AsyncLoader::Pool::Pool(const AsyncLoader::PoolInitializer & init) /* max_threads = */ std::numeric_limits::max(), // Unlimited number of threads, we do worker management ourselves /* max_free_threads = */ 0, // We do not require free threads /* queue_size = */0)) // Unlimited queue to avoid blocking during worker spawning - , max_threads(init.max_threads > 0 ? init.max_threads : getNumberOfPhysicalCPUCores()) {} AsyncLoader::Pool::Pool(Pool&& o) noexcept : name(o.name) , priority(o.priority) - , thread_pool(std::move(o.thread_pool)) , ready_queue(std::move(o.ready_queue)) , max_threads(o.max_threads) , workers(o.workers) , suspended_workers(o.suspended_workers.load()) // All these constructors are needed because std::atomic is neither copy-constructible, nor move-constructible. We never move pools after init, so it is safe. + , thread_pool(std::move(o.thread_pool)) {} void cancelOnDependencyFailure(const LoadJobPtr & self, const LoadJobPtr & dependency, std::exception_ptr & cancel) diff --git a/src/Common/AsyncLoader.h b/src/Common/AsyncLoader.h index 42707a4ee912..05b809aceae1 100644 --- a/src/Common/AsyncLoader.h +++ b/src/Common/AsyncLoader.h @@ -365,11 +365,11 @@ class AsyncLoader : private boost::noncopyable { const String name; const Priority priority; - std::unique_ptr thread_pool; // NOTE: we avoid using a `ThreadPool` queue to be able to move jobs between pools. std::map ready_queue; // FIFO queue of jobs to be executed in this pool. Map is used for faster erasing. Key is `ready_seqno` size_t max_threads; // Max number of workers to be spawn size_t workers = 0; // Number of currently executing workers std::atomic suspended_workers{0}; // Number of workers that are blocked by `wait()` call on a job executing in the same pool (for deadlock resolution) + std::unique_ptr thread_pool; // NOTE: we avoid using a `ThreadPool` queue to be able to move jobs between pools. explicit Pool(const PoolInitializer & init); Pool(Pool&& o) noexcept;