mirror of
https://github.com/YosysHQ/yosys
synced 2026-03-28 07:15:48 +00:00
Prevent race on num_active_worker_threads_.
The core issue here is that we need to ensure `num_active_worker_threads_` is read before incrementing `done_workers`. See the comments added in this PR to explain why, and why the resulting code is race-free.
This commit is contained in:
parent
66306a8ca3
commit
290fb0556d
2 changed files with 28 additions and 10 deletions
|
|
@ -110,7 +110,7 @@ ParallelDispatchThreadPool::~ParallelDispatchThreadPool()
|
|||
if (num_worker_threads_ == 0)
|
||||
return;
|
||||
current_work = nullptr;
|
||||
num_active_worker_threads_ = num_worker_threads_;
|
||||
num_active_worker_threads_.store(num_worker_threads_, std::memory_order_relaxed);
|
||||
signal_workers_start();
|
||||
wait_for_workers_done();
|
||||
#endif
|
||||
|
|
@ -119,15 +119,16 @@ ParallelDispatchThreadPool::~ParallelDispatchThreadPool()
|
|||
void ParallelDispatchThreadPool::run(std::function<void(const RunCtx &)> work, int max_threads)
|
||||
{
|
||||
Multithreading multithreading;
|
||||
num_active_worker_threads_ = num_threads(max_threads) - 1;
|
||||
if (num_active_worker_threads_ == 0) {
|
||||
int num_active_worker_threads = num_threads(max_threads) - 1;
|
||||
if (num_active_worker_threads == 0) {
|
||||
work({{0}, 1});
|
||||
return;
|
||||
}
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
num_active_worker_threads_.store(num_active_worker_threads, std::memory_order_relaxed);
|
||||
current_work = &work;
|
||||
signal_workers_start();
|
||||
work({{0}, num_active_worker_threads_ + 1});
|
||||
work({{0}, num_active_worker_threads + 1});
|
||||
wait_for_workers_done();
|
||||
#endif
|
||||
}
|
||||
|
|
@ -140,7 +141,8 @@ void ParallelDispatchThreadPool::run_worker(int thread_num)
|
|||
worker_wait_for_start(thread_num);
|
||||
if (current_work == nullptr)
|
||||
break;
|
||||
(*current_work)({{thread_num + 1}, num_active_worker_threads_ + 1});
|
||||
int num_active_worker_threads = num_active_worker_threads_.load(std::memory_order_relaxed);
|
||||
(*current_work)({{thread_num + 1}, num_active_worker_threads + 1});
|
||||
signal_worker_done();
|
||||
}
|
||||
signal_worker_done();
|
||||
|
|
|
|||
|
|
@ -236,8 +236,15 @@ private:
|
|||
std::function<void(const RunCtx &)> *current_work = nullptr;
|
||||
// Keeps a correct count even when threads are exiting.
|
||||
int num_worker_threads_;
|
||||
// The count of active workerthreads for the current `run()`.
|
||||
int num_active_worker_threads_ = 0;
|
||||
// The count of active worker threads for the current `run()`.
|
||||
// This is only written by the main thread, and only written when
|
||||
// no other worker threads are running (i.e. all worker threads have
|
||||
// passed the increment of `done_workers` in `signal_worker_done()`
|
||||
// and not passed the release of the lock in `worker_wait_for_start()`.
|
||||
// Although there can't be any races, we still need to make it atomic
|
||||
// to prevent the compiler reordering accesses so the above invariant
|
||||
// is maintained.
|
||||
std::atomic<int> num_active_worker_threads_ = 0;
|
||||
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
// Not especially efficient for large numbers of threads. Worker wakeup could scale
|
||||
|
|
@ -248,7 +255,8 @@ private:
|
|||
std::vector<uint8_t> main_to_workers_signal;
|
||||
void signal_workers_start() {
|
||||
std::unique_lock lock(main_to_workers_signal_mutex);
|
||||
std::fill(main_to_workers_signal.begin(), main_to_workers_signal.begin() + num_active_worker_threads_, 1);
|
||||
int num_active_worker_threads = num_active_worker_threads_.load(std::memory_order_relaxed);
|
||||
std::fill(main_to_workers_signal.begin(), main_to_workers_signal.begin() + num_active_worker_threads, 1);
|
||||
// When `num_active_worker_threads_` is small compared to `num_worker_threads_`, we have a "thundering herd"
|
||||
// problem here. Fixing that would add complexity so don't worry about it for now.
|
||||
main_to_workers_signal_cv.notify_all();
|
||||
|
|
@ -263,15 +271,23 @@ private:
|
|||
std::mutex workers_to_main_signal_mutex;
|
||||
std::condition_variable workers_to_main_signal_cv;
|
||||
void signal_worker_done() {
|
||||
// Must read `num_active_worker_threads_` before we increment `d`! Otherwise
|
||||
// it is possible we would increment `d`, and then another worker signals the
|
||||
// main thread that all workers are done, and the main thread writes to
|
||||
// `num_active_worker_threads_` before we check it.
|
||||
int num_active_worker_threads = num_active_worker_threads_.load(std::memory_order_relaxed);
|
||||
int d = done_workers.fetch_add(1, std::memory_order_release);
|
||||
if (d + 1 == num_active_worker_threads_) {
|
||||
if (d + 1 == num_active_worker_threads) {
|
||||
std::unique_lock lock(workers_to_main_signal_mutex);
|
||||
workers_to_main_signal_cv.notify_all();
|
||||
}
|
||||
}
|
||||
void wait_for_workers_done() {
|
||||
std::unique_lock lock(workers_to_main_signal_mutex);
|
||||
workers_to_main_signal_cv.wait(lock, [this] { return done_workers.load(std::memory_order_acquire) == num_active_worker_threads_; });
|
||||
workers_to_main_signal_cv.wait(lock, [this] {
|
||||
int num_active_worker_threads = num_active_worker_threads_.load(std::memory_order_relaxed);
|
||||
return done_workers.load(std::memory_order_acquire) == num_active_worker_threads;
|
||||
});
|
||||
done_workers.store(0, std::memory_order_relaxed);
|
||||
}
|
||||
#endif
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue