diff --git a/kernel/threading.cc b/kernel/threading.cc index 3766c4ddf..817a48df0 100644 --- a/kernel/threading.cc +++ b/kernel/threading.cc @@ -110,7 +110,7 @@ ParallelDispatchThreadPool::~ParallelDispatchThreadPool() if (num_worker_threads_ == 0) return; current_work = nullptr; - num_active_worker_threads_ = num_worker_threads_; + num_active_worker_threads_.store(num_worker_threads_, std::memory_order_relaxed); signal_workers_start(); wait_for_workers_done(); #endif @@ -119,15 +119,16 @@ ParallelDispatchThreadPool::~ParallelDispatchThreadPool() void ParallelDispatchThreadPool::run(std::function work, int max_threads) { Multithreading multithreading; - num_active_worker_threads_ = num_threads(max_threads) - 1; - if (num_active_worker_threads_ == 0) { + int num_active_worker_threads = num_threads(max_threads) - 1; + if (num_active_worker_threads == 0) { work({{0}, 1}); return; } #ifdef YOSYS_ENABLE_THREADS + num_active_worker_threads_.store(num_active_worker_threads, std::memory_order_relaxed); current_work = &work; signal_workers_start(); - work({{0}, num_active_worker_threads_ + 1}); + work({{0}, num_active_worker_threads + 1}); wait_for_workers_done(); #endif } @@ -140,7 +141,8 @@ void ParallelDispatchThreadPool::run_worker(int thread_num) worker_wait_for_start(thread_num); if (current_work == nullptr) break; - (*current_work)({{thread_num + 1}, num_active_worker_threads_ + 1}); + int num_active_worker_threads = num_active_worker_threads_.load(std::memory_order_relaxed); + (*current_work)({{thread_num + 1}, num_active_worker_threads + 1}); signal_worker_done(); } signal_worker_done(); diff --git a/kernel/threading.h b/kernel/threading.h index fb1602cf5..a041def2c 100644 --- a/kernel/threading.h +++ b/kernel/threading.h @@ -236,8 +236,15 @@ private: std::function *current_work = nullptr; // Keeps a correct count even when threads are exiting. int num_worker_threads_; - // The count of active workerthreads for the current `run()`. - int num_active_worker_threads_ = 0; + // The count of active worker threads for the current `run()`. + // This is only written by the main thread, and only written when + // no other worker threads are running (i.e. all worker threads have + // passed the increment of `done_workers` in `signal_worker_done()` + // and not passed the release of the lock in `worker_wait_for_start()`. + // Although there can't be any races, we still need to make it atomic + // to prevent the compiler reordering accesses so the above invariant + // is maintained. + std::atomic num_active_worker_threads_ = 0; #ifdef YOSYS_ENABLE_THREADS // Not especially efficient for large numbers of threads. Worker wakeup could scale @@ -248,7 +255,8 @@ private: std::vector main_to_workers_signal; void signal_workers_start() { std::unique_lock lock(main_to_workers_signal_mutex); - std::fill(main_to_workers_signal.begin(), main_to_workers_signal.begin() + num_active_worker_threads_, 1); + int num_active_worker_threads = num_active_worker_threads_.load(std::memory_order_relaxed); + std::fill(main_to_workers_signal.begin(), main_to_workers_signal.begin() + num_active_worker_threads, 1); // When `num_active_worker_threads_` is small compared to `num_worker_threads_`, we have a "thundering herd" // problem here. Fixing that would add complexity so don't worry about it for now. main_to_workers_signal_cv.notify_all(); @@ -263,15 +271,23 @@ private: std::mutex workers_to_main_signal_mutex; std::condition_variable workers_to_main_signal_cv; void signal_worker_done() { + // Must read `num_active_worker_threads_` before we increment `d`! Otherwise + // it is possible we would increment `d`, and then another worker signals the + // main thread that all workers are done, and the main thread writes to + // `num_active_worker_threads_` before we check it. + int num_active_worker_threads = num_active_worker_threads_.load(std::memory_order_relaxed); int d = done_workers.fetch_add(1, std::memory_order_release); - if (d + 1 == num_active_worker_threads_) { + if (d + 1 == num_active_worker_threads) { std::unique_lock lock(workers_to_main_signal_mutex); workers_to_main_signal_cv.notify_all(); } } void wait_for_workers_done() { std::unique_lock lock(workers_to_main_signal_mutex); - workers_to_main_signal_cv.wait(lock, [this] { return done_workers.load(std::memory_order_acquire) == num_active_worker_threads_; }); + workers_to_main_signal_cv.wait(lock, [this] { + int num_active_worker_threads = num_active_worker_threads_.load(std::memory_order_relaxed); + return done_workers.load(std::memory_order_acquire) == num_active_worker_threads; + }); done_workers.store(0, std::memory_order_relaxed); } #endif