mirror of
https://github.com/YosysHQ/yosys
synced 2026-02-14 12:51:48 +00:00
Add ParallelDispatchThreadPool
We'll use this later in the PR.
This commit is contained in:
parent
fb24763a15
commit
0004708177
2 changed files with 162 additions and 0 deletions
|
|
@ -92,4 +92,59 @@ IntRange item_range_for_worker(int num_items, int thread_num, int num_threads)
|
|||
return {start, end};
|
||||
}
|
||||
|
||||
ParallelDispatchThreadPool::ParallelDispatchThreadPool(int pool_size)
|
||||
: num_worker_threads_(std::max(1, pool_size) - 1)
|
||||
{
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
main_to_workers_signal.resize(num_worker_threads_, 0);
|
||||
#endif
|
||||
// Don't start the threads until we've constructed all our data members.
|
||||
thread_pool = std::make_unique<ThreadPool>(num_worker_threads_, [this](int thread_num){
|
||||
run_worker(thread_num);
|
||||
});
|
||||
}
|
||||
|
||||
ParallelDispatchThreadPool::~ParallelDispatchThreadPool()
|
||||
{
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
if (num_worker_threads_ == 0)
|
||||
return;
|
||||
current_work = nullptr;
|
||||
num_active_worker_threads_ = num_worker_threads_;
|
||||
signal_workers_start();
|
||||
wait_for_workers_done();
|
||||
#endif
|
||||
}
|
||||
|
||||
void ParallelDispatchThreadPool::run(std::function<void(const RunCtx &)> work, int max_threads)
|
||||
{
|
||||
Multithreading multithreading;
|
||||
num_active_worker_threads_ = num_threads(max_threads) - 1;
|
||||
if (num_active_worker_threads_ == 0) {
|
||||
work({{0}, 1});
|
||||
return;
|
||||
}
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
current_work = &work;
|
||||
signal_workers_start();
|
||||
work({{0}, num_active_worker_threads_ + 1});
|
||||
wait_for_workers_done();
|
||||
#endif
|
||||
}
|
||||
|
||||
void ParallelDispatchThreadPool::run_worker(int thread_num)
|
||||
{
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
while (true)
|
||||
{
|
||||
worker_wait_for_start(thread_num);
|
||||
if (current_work == nullptr)
|
||||
break;
|
||||
(*current_work)({{thread_num + 1}, num_active_worker_threads_ + 1});
|
||||
signal_worker_done();
|
||||
}
|
||||
signal_worker_done();
|
||||
#endif
|
||||
}
|
||||
|
||||
YOSYS_NAMESPACE_END
|
||||
|
|
|
|||
|
|
@ -183,6 +183,113 @@ struct ThreadIndex {
|
|||
int thread_num;
|
||||
};
|
||||
|
||||
// A set of threads with a `run()` API that runs a closure on all of the threads
|
||||
// and wait for all those closures to complete. This is a convenient way to implement
|
||||
// parallel algorithms that use barrier synchronization.
|
||||
class ParallelDispatchThreadPool
|
||||
{
|
||||
public:
|
||||
// Create a pool of threads running the given closure (parameterized by thread number).
|
||||
// `pool_size` must be the result of a `pool_size()` call.
|
||||
// `pool_size` can be zero, which we treat as 1.
|
||||
ParallelDispatchThreadPool(int pool_size);
|
||||
~ParallelDispatchThreadPool();
|
||||
|
||||
// For each thread running a closure, a `RunCtx` is passed to the closure. Currently
|
||||
// it contains the thread index and the total number of threads. It can be passed
|
||||
// directly to any APIs requiring a `ThreadIndex`.
|
||||
struct RunCtx : public ThreadIndex {
|
||||
int num_threads;
|
||||
IntRange item_range(int num_items) const {
|
||||
return item_range_for_worker(num_items, thread_num, num_threads);
|
||||
}
|
||||
};
|
||||
// Sometimes we only want to activate a subset of the threads in the pool. This
|
||||
// class provides a way to do that. It provides the same `num_threads()`
|
||||
// and `run()` APIs as a `ParallelDispatchThreadPool`.
|
||||
class Subpool {
|
||||
public:
|
||||
Subpool(ParallelDispatchThreadPool &parent, int max_threads)
|
||||
: parent(parent), max_threads(max_threads) {}
|
||||
// Returns the number of threads that will be used when calling `run()`.
|
||||
int num_threads() const {
|
||||
return parent.num_threads(max_threads);
|
||||
}
|
||||
void run(std::function<void(const RunCtx &)> work) {
|
||||
parent.run(std::move(work), max_threads);
|
||||
}
|
||||
ParallelDispatchThreadPool &thread_pool() { return parent; }
|
||||
private:
|
||||
ParallelDispatchThreadPool &parent;
|
||||
int max_threads;
|
||||
};
|
||||
|
||||
// Run the `work` function in parallel on each thread in the pool (parameterized by
|
||||
// thread number). Waits for all work functions to complete. Only one `run()` can be
|
||||
// active at a time.
|
||||
// Uses no more than `max_threads` threads (but at least one).
|
||||
void run(std::function<void(const RunCtx &)> work) {
|
||||
run(std::move(work), INT_MAX);
|
||||
}
|
||||
|
||||
// Returns the number of threads that will be used when calling `run()`.
|
||||
int num_threads() const {
|
||||
return num_threads(INT_MAX);
|
||||
}
|
||||
private:
|
||||
friend class Subpool;
|
||||
|
||||
void run(std::function<void(const RunCtx &)> work, int max_threads);
|
||||
int num_threads(int max_threads) const {
|
||||
return std::min(num_worker_threads_ + 1, std::max(1, max_threads));
|
||||
}
|
||||
void run_worker(int thread_num);
|
||||
|
||||
std::unique_ptr<ThreadPool> thread_pool;
|
||||
std::function<void(const RunCtx &)> *current_work = nullptr;
|
||||
// Keeps a correct count even when threads are exiting.
|
||||
int num_worker_threads_;
|
||||
// The count of active workerthreads for the current `run()`.
|
||||
int num_active_worker_threads_ = 0;
|
||||
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
// Not especially efficient for large numbers of threads. Worker wakeup could scale
|
||||
// better by conceptually organising workers into a tree and having workers wake
|
||||
// up their children.
|
||||
std::mutex main_to_workers_signal_mutex;
|
||||
std::condition_variable main_to_workers_signal_cv;
|
||||
std::vector<uint8_t> main_to_workers_signal;
|
||||
void signal_workers_start() {
|
||||
std::unique_lock lock(main_to_workers_signal_mutex);
|
||||
std::fill(main_to_workers_signal.begin(), main_to_workers_signal.begin() + num_active_worker_threads_, 1);
|
||||
// When `num_active_worker_threads_` is small compared to `num_worker_threads_`, we have a "thundering herd"
|
||||
// problem here. Fixing that would add complexity so don't worry about it for now.
|
||||
main_to_workers_signal_cv.notify_all();
|
||||
}
|
||||
void worker_wait_for_start(int thread_num) {
|
||||
std::unique_lock lock(main_to_workers_signal_mutex);
|
||||
main_to_workers_signal_cv.wait(lock, [this, thread_num] { return main_to_workers_signal[thread_num] > 0; });
|
||||
main_to_workers_signal[thread_num] = 0;
|
||||
}
|
||||
|
||||
std::atomic<int> done_workers = 0;
|
||||
std::mutex workers_to_main_signal_mutex;
|
||||
std::condition_variable workers_to_main_signal_cv;
|
||||
void signal_worker_done() {
|
||||
int d = done_workers.fetch_add(1, std::memory_order_release);
|
||||
if (d + 1 == num_active_worker_threads_) {
|
||||
std::unique_lock lock(workers_to_main_signal_mutex);
|
||||
workers_to_main_signal_cv.notify_all();
|
||||
}
|
||||
}
|
||||
void wait_for_workers_done() {
|
||||
std::unique_lock lock(workers_to_main_signal_mutex);
|
||||
workers_to_main_signal_cv.wait(lock, [this] { return done_workers.load(std::memory_order_acquire) == num_active_worker_threads_; });
|
||||
done_workers.store(0, std::memory_order_relaxed);
|
||||
}
|
||||
#endif
|
||||
};
|
||||
|
||||
template <class T>
|
||||
class ConcurrentStack
|
||||
{
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue