mirror of
https://github.com/YosysHQ/yosys
synced 2026-05-20 08:59:36 +00:00
Merge pull request #5875 from YosysHQ/emil/threading-fix-no-threads
threading: redirect locks to no-op on single-threaded builds
This commit is contained in:
commit
5c6de04467
3 changed files with 66 additions and 63 deletions
|
|
@ -66,11 +66,11 @@ ThreadPool::ThreadPool(int pool_size, std::function<void(int)> b)
|
|||
: body(std::move(b))
|
||||
{
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
threads.reserve(pool_size);
|
||||
for (int i = 0; i < pool_size; i++)
|
||||
threads.emplace_back([i, this]{ body(i); });
|
||||
threads.reserve(pool_size);
|
||||
for (int i = 0; i < pool_size; i++)
|
||||
threads.emplace_back([i, this]{ body(i); });
|
||||
#else
|
||||
log_assert(pool_size == 0);
|
||||
(void)pool_size;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
|
@ -96,11 +96,13 @@ IntRange item_range_for_worker(int num_items, int thread_num, int num_threads)
|
|||
}
|
||||
|
||||
ParallelDispatchThreadPool::ParallelDispatchThreadPool(int pool_size)
|
||||
: num_worker_threads_(std::max(1, pool_size) - 1)
|
||||
{
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
main_to_workers_signal.resize(num_worker_threads_, 0);
|
||||
: num_worker_threads_(std::max(1, pool_size) - 1)
|
||||
#else
|
||||
: num_worker_threads_(0)
|
||||
#endif
|
||||
{
|
||||
main_to_workers_signal.resize(num_worker_threads_, 0);
|
||||
// Don't start the threads until we've constructed all our data members.
|
||||
thread_pool = std::make_unique<ThreadPool>(num_worker_threads_, [this](int thread_num){
|
||||
run_worker(thread_num);
|
||||
|
|
@ -109,14 +111,12 @@ ParallelDispatchThreadPool::ParallelDispatchThreadPool(int pool_size)
|
|||
|
||||
ParallelDispatchThreadPool::~ParallelDispatchThreadPool()
|
||||
{
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
if (num_worker_threads_ == 0)
|
||||
return;
|
||||
current_work = nullptr;
|
||||
num_active_worker_threads_.store(num_worker_threads_, std::memory_order_relaxed);
|
||||
signal_workers_start();
|
||||
wait_for_workers_done();
|
||||
#endif
|
||||
}
|
||||
|
||||
void ParallelDispatchThreadPool::run(std::function<void(const RunCtx &)> work, int max_threads)
|
||||
|
|
@ -127,13 +127,11 @@ void ParallelDispatchThreadPool::run(std::function<void(const RunCtx &)> work, i
|
|||
work({{0}, 1});
|
||||
return;
|
||||
}
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
num_active_worker_threads_.store(num_active_worker_threads, std::memory_order_relaxed);
|
||||
current_work = &work;
|
||||
signal_workers_start();
|
||||
work({{0}, num_active_worker_threads + 1});
|
||||
wait_for_workers_done();
|
||||
#endif
|
||||
}
|
||||
|
||||
void ParallelDispatchThreadPool::run_worker(int thread_num)
|
||||
|
|
|
|||
|
|
@ -15,6 +15,33 @@
|
|||
|
||||
YOSYS_NAMESPACE_BEGIN
|
||||
|
||||
// Redirect to no-op to avoid dependence on <mutex>
|
||||
// and <condition_variable> in single-threaded builds
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
using Mutex = std::mutex;
|
||||
using CondVar = std::condition_variable;
|
||||
using UniqueLock = std::unique_lock<Mutex>;
|
||||
using LockGuard = std::lock_guard<Mutex>;
|
||||
#else
|
||||
struct Mutex {
|
||||
void lock() {}
|
||||
void unlock() {}
|
||||
bool try_lock() { return true; }
|
||||
};
|
||||
struct CondVar {
|
||||
template <class L> void wait(L &) {}
|
||||
template <class L, class P> void wait(L &, P) {}
|
||||
void notify_one() {}
|
||||
void notify_all() {}
|
||||
};
|
||||
struct UniqueLock {
|
||||
UniqueLock(Mutex &) {}
|
||||
};
|
||||
struct LockGuard {
|
||||
LockGuard(Mutex &) {}
|
||||
};
|
||||
#endif
|
||||
|
||||
// Concurrent queue implementation. Not fast, but simple.
|
||||
// Multi-producer, multi-consumer, optionally bounded.
|
||||
// When YOSYS_ENABLE_THREADS is not defined, this is just a non-thread-safe non-blocking deque.
|
||||
|
|
@ -27,26 +54,20 @@ public:
|
|||
// Push an element into the queue. If it's at capacity, block until there is room.
|
||||
void push_back(T t)
|
||||
{
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
UniqueLock lock(mutex);
|
||||
not_full_condition.wait(lock, [this] { return static_cast<int>(contents.size()) < capacity; });
|
||||
if (contents.empty())
|
||||
not_empty_condition.notify_one();
|
||||
#endif
|
||||
log_assert(!closed);
|
||||
contents.push_back(std::move(t));
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
if (static_cast<int>(contents.size()) < capacity)
|
||||
not_full_condition.notify_one();
|
||||
#endif
|
||||
}
|
||||
// Signal that no more elements will be produced. `pop_front()` will return nullopt.
|
||||
void close()
|
||||
{
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
UniqueLock lock(mutex);
|
||||
not_empty_condition.notify_all();
|
||||
#endif
|
||||
closed = true;
|
||||
}
|
||||
// Pop an element from the queue. Blocks until an element is available
|
||||
|
|
@ -62,39 +83,28 @@ public:
|
|||
return pop_front_internal(false);
|
||||
}
|
||||
private:
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
std::optional<T> pop_front_internal(bool wait)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
UniqueLock lock(mutex);
|
||||
if (wait) {
|
||||
not_empty_condition.wait(lock, [this] { return !contents.empty() || closed; });
|
||||
}
|
||||
#else
|
||||
std::optional<T> pop_front_internal(bool)
|
||||
{
|
||||
#endif
|
||||
if (contents.empty())
|
||||
return std::nullopt;
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
if (static_cast<int>(contents.size()) == capacity)
|
||||
not_full_condition.notify_one();
|
||||
#endif
|
||||
T result = std::move(contents.front());
|
||||
contents.pop_front();
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
if (!contents.empty())
|
||||
not_empty_condition.notify_one();
|
||||
#endif
|
||||
return std::move(result);
|
||||
}
|
||||
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
std::mutex mutex;
|
||||
Mutex mutex;
|
||||
// Signals one waiter thread when the queue changes and is not full.
|
||||
std::condition_variable not_full_condition;
|
||||
CondVar not_full_condition;
|
||||
// Signals one waiter thread when the queue changes and is not empty.
|
||||
std::condition_variable not_empty_condition;
|
||||
#endif
|
||||
CondVar not_empty_condition;
|
||||
std::deque<T> contents;
|
||||
int capacity;
|
||||
bool closed = false;
|
||||
|
|
@ -245,15 +255,14 @@ private:
|
|||
// is maintained.
|
||||
std::atomic<int> num_active_worker_threads_ = 0;
|
||||
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
// Not especially efficient for large numbers of threads. Worker wakeup could scale
|
||||
// better by conceptually organising workers into a tree and having workers wake
|
||||
// up their children.
|
||||
std::mutex main_to_workers_signal_mutex;
|
||||
std::condition_variable main_to_workers_signal_cv;
|
||||
Mutex main_to_workers_signal_mutex;
|
||||
CondVar main_to_workers_signal_cv;
|
||||
std::vector<uint8_t> main_to_workers_signal;
|
||||
void signal_workers_start() {
|
||||
std::unique_lock lock(main_to_workers_signal_mutex);
|
||||
UniqueLock lock(main_to_workers_signal_mutex);
|
||||
int num_active_worker_threads = num_active_worker_threads_.load(std::memory_order_relaxed);
|
||||
std::fill(main_to_workers_signal.begin(), main_to_workers_signal.begin() + num_active_worker_threads, 1);
|
||||
// When `num_active_worker_threads_` is small compared to `num_worker_threads_`, we have a "thundering herd"
|
||||
|
|
@ -261,14 +270,14 @@ private:
|
|||
main_to_workers_signal_cv.notify_all();
|
||||
}
|
||||
void worker_wait_for_start(int thread_num) {
|
||||
std::unique_lock lock(main_to_workers_signal_mutex);
|
||||
UniqueLock lock(main_to_workers_signal_mutex);
|
||||
main_to_workers_signal_cv.wait(lock, [this, thread_num] { return main_to_workers_signal[thread_num] > 0; });
|
||||
main_to_workers_signal[thread_num] = 0;
|
||||
}
|
||||
|
||||
std::atomic<int> done_workers = 0;
|
||||
std::mutex workers_to_main_signal_mutex;
|
||||
std::condition_variable workers_to_main_signal_cv;
|
||||
Mutex workers_to_main_signal_mutex;
|
||||
CondVar workers_to_main_signal_cv;
|
||||
void signal_worker_done() {
|
||||
// Must read `num_active_worker_threads_` before we increment `d`! Otherwise
|
||||
// it is possible we would increment `d`, and then another worker signals the
|
||||
|
|
@ -277,19 +286,18 @@ private:
|
|||
int num_active_worker_threads = num_active_worker_threads_.load(std::memory_order_relaxed);
|
||||
int d = done_workers.fetch_add(1, std::memory_order_release);
|
||||
if (d + 1 == num_active_worker_threads) {
|
||||
std::unique_lock lock(workers_to_main_signal_mutex);
|
||||
UniqueLock lock(workers_to_main_signal_mutex);
|
||||
workers_to_main_signal_cv.notify_all();
|
||||
}
|
||||
}
|
||||
void wait_for_workers_done() {
|
||||
std::unique_lock lock(workers_to_main_signal_mutex);
|
||||
UniqueLock lock(workers_to_main_signal_mutex);
|
||||
workers_to_main_signal_cv.wait(lock, [this] {
|
||||
int num_active_worker_threads = num_active_worker_threads_.load(std::memory_order_relaxed);
|
||||
return done_workers.load(std::memory_order_acquire) == num_active_worker_threads;
|
||||
});
|
||||
done_workers.store(0, std::memory_order_relaxed);
|
||||
}
|
||||
#endif
|
||||
// Ensure `thread_pool` is destroyed before any other members,
|
||||
// forcing all threads to be joined before destroying the
|
||||
// members (e.g. workers_to_main_signal_mutex) they might be using.
|
||||
|
|
@ -301,15 +309,11 @@ class ConcurrentStack
|
|||
{
|
||||
public:
|
||||
void push_back(T &&t) {
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
#endif
|
||||
LockGuard lock(mutex);
|
||||
contents.push_back(std::move(t));
|
||||
}
|
||||
std::optional<T> try_pop_back() {
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
#endif
|
||||
LockGuard lock(mutex);
|
||||
if (contents.empty())
|
||||
return std::nullopt;
|
||||
T result = std::move(contents.back());
|
||||
|
|
@ -317,9 +321,7 @@ public:
|
|||
return result;
|
||||
}
|
||||
private:
|
||||
#ifdef YOSYS_ENABLE_THREADS
|
||||
std::mutex mutex;
|
||||
#endif
|
||||
Mutex mutex;
|
||||
std::vector<T> contents;
|
||||
};
|
||||
|
||||
|
|
@ -596,12 +598,12 @@ public:
|
|||
return;
|
||||
bool was_empty;
|
||||
{
|
||||
std::unique_lock lock(thread_state.batches_lock);
|
||||
UniqueLock lock(thread_state.batches_lock);
|
||||
was_empty = thread_state.batches.empty();
|
||||
thread_state.batches.push_back(std::move(thread_state.next_batch));
|
||||
}
|
||||
if (was_empty) {
|
||||
std::unique_lock lock(waiters_lock);
|
||||
UniqueLock lock(waiters_lock);
|
||||
if (num_waiters > 0) {
|
||||
waiters_cv.notify_one();
|
||||
}
|
||||
|
|
@ -617,7 +619,7 @@ public:
|
|||
return std::move(thread_state.next_batch);
|
||||
// Empty our own work queue first.
|
||||
{
|
||||
std::unique_lock lock(thread_state.batches_lock);
|
||||
UniqueLock lock(thread_state.batches_lock);
|
||||
if (!thread_state.batches.empty()) {
|
||||
std::vector<T> batch = std::move(thread_state.batches.back());
|
||||
thread_state.batches.pop_back();
|
||||
|
|
@ -634,8 +636,9 @@ public:
|
|||
// them will eventually enter this loop and there will be no further
|
||||
// notifications on waiters_cv, so all will eventually increment
|
||||
// num_waiters and wait, so num_waiters == num_threads()
|
||||
// will become true.
|
||||
std::unique_lock lock(waiters_lock);
|
||||
// will become true. In single-threaded builds, num_threads() is 1,
|
||||
// so we always terminate on the first iteration.
|
||||
UniqueLock lock(waiters_lock);
|
||||
++num_waiters;
|
||||
if (num_waiters == num_threads()) {
|
||||
waiters_cv.notify_all();
|
||||
|
|
@ -654,7 +657,7 @@ private:
|
|||
for (int i = 1; i < num_threads(); i++) {
|
||||
int other_thread_num = (thread.thread_num + i) % num_threads();
|
||||
ThreadState &other_thread_state = thread_states[other_thread_num];
|
||||
std::unique_lock lock(other_thread_state.batches_lock);
|
||||
UniqueLock lock(other_thread_state.batches_lock);
|
||||
if (!other_thread_state.batches.empty()) {
|
||||
std::vector<T> batch = std::move(other_thread_state.batches.front());
|
||||
other_thread_state.batches.pop_front();
|
||||
|
|
@ -670,15 +673,15 @@ private:
|
|||
// Entirely thread-local.
|
||||
std::vector<T> next_batch;
|
||||
|
||||
std::mutex batches_lock;
|
||||
Mutex batches_lock;
|
||||
// Only the associated thread ever adds to this, and only at the back.
|
||||
// Other threads can remove elements from the front.
|
||||
std::deque<std::vector<T>> batches;
|
||||
};
|
||||
std::vector<ThreadState> thread_states;
|
||||
|
||||
std::mutex waiters_lock;
|
||||
std::condition_variable waiters_cv;
|
||||
Mutex waiters_lock;
|
||||
CondVar waiters_cv;
|
||||
// Number of threads waiting for work. Their queues are empty.
|
||||
int num_waiters = 0;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -109,8 +109,10 @@ TEST_F(ThreadingTest, IntRangeIteration) {
|
|||
|
||||
TEST_F(ThreadingTest, IntRangeEmpty) {
|
||||
IntRange range{5, 5};
|
||||
for (int _ : range)
|
||||
for (int _ : range) {
|
||||
(void)_;
|
||||
FAIL();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ThreadingTest, ItemRangeForWorker) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue