diff --git a/kernel/threading.h b/kernel/threading.h index c1897de10..3a5f5e820 100644 --- a/kernel/threading.h +++ b/kernel/threading.h @@ -562,6 +562,112 @@ private: std::vector shards; }; +// A concurrent work-queue that can share batches of work across threads. +// Uses a naive implementation of work-stealing. +template +class ConcurrentWorkQueue { +public: + // Create a queue that supports the given number of threads and + // groups work into `batch_size` units. + ConcurrentWorkQueue(int num_threads, int batch_size = 100) + : batch_size(batch_size), thread_states(num_threads) {} + int num_threads() const { return GetSize(thread_states); } + // Push some work to do. Pushes and pops with the same `thread` must + // not happen concurrently. + void push(const ThreadIndex &thread, T work) { + ThreadState &thread_state = thread_states[thread.thread_num]; + thread_state.next_batch.emplace_back(std::move(work)); + if (GetSize(thread_state.next_batch) < batch_size) + return; + bool was_empty; + { + std::unique_lock lock(thread_state.batches_lock); + was_empty = thread_state.batches.empty(); + thread_state.batches.push_back(std::move(thread_state.next_batch)); + } + if (was_empty) { + std::unique_lock lock(waiters_lock); + if (num_waiters > 0) { + waiters_cv.notify_one(); + } + } + } + // Grab some work to do. + // If all threads enter `pop_batch()`, then instead of deadlocking the + // queue will return no work. That is the only case in which it will + // return no work. + std::vector pop_batch(const ThreadIndex &thread) { + ThreadState &thread_state = thread_states[thread.thread_num]; + if (!thread_state.next_batch.empty()) + return std::move(thread_state.next_batch); + // Empty our own work queue first. + { + std::unique_lock lock(thread_state.batches_lock); + if (!thread_state.batches.empty()) { + std::vector batch = std::move(thread_state.batches.back()); + thread_state.batches.pop_back(); + return batch; + } + } + // From here on in this function, our work queue is empty. + while (true) { + std::vector batch = try_steal(thread); + if (!batch.empty()) { + return std::move(batch); + } + // Termination: if all threads run out of work, then all of + // them will eventually enter this loop and there will be no further + // notifications on waiters_cv, so all will eventually increment + // num_waiters and wait, so num_waiters == num_threads() + // will become true. + std::unique_lock lock(waiters_lock); + ++num_waiters; + if (num_waiters == num_threads()) { + waiters_cv.notify_all(); + return {}; + } + // As above, it's possible that we'll wait here even when there + // are work batches posted by other threads. That's OK. + waiters_cv.wait(lock); + if (num_waiters == num_threads()) + return {}; + --num_waiters; + } + } +private: + std::vector try_steal(const ThreadIndex &thread) { + for (int i = 1; i < num_threads(); i++) { + int other_thread_num = (thread.thread_num + i) % num_threads(); + ThreadState &other_thread_state = thread_states[other_thread_num]; + std::unique_lock lock(other_thread_state.batches_lock); + if (!other_thread_state.batches.empty()) { + std::vector batch = std::move(other_thread_state.batches.front()); + other_thread_state.batches.pop_front(); + return batch; + } + } + return {}; + } + + int batch_size; + + struct ThreadState { + // Entirely thread-local. + std::vector next_batch; + + std::mutex batches_lock; + // Only the associated thread ever adds to this, and only at the back. + // Other threads can remove elements from the front. + std::deque> batches; + }; + std::vector thread_states; + + std::mutex waiters_lock; + std::condition_variable waiters_cv; + // Number of threads waiting for work. Their queues are empty. + int num_waiters = 0; +}; + YOSYS_NAMESPACE_END #endif // YOSYS_THREADING_H