From 9f0d5d1aca06fde2f8529fe13be9ae0c8124ff87 Mon Sep 17 00:00:00 2001 From: Robert O'Callahan Date: Sat, 6 Sep 2025 04:20:14 +0000 Subject: [PATCH] Add thread support code Provides very simple ConcurrentQueue and ThreadPool classes that build even when threading is disabled. Also provides a `DeferredLogs` class that captures log output to be replayed on the main thread later. --- Makefile | 14 +++- kernel/log.cc | 7 ++ kernel/log.h | 1 + kernel/threading.cc | 45 ++++++++++ kernel/threading.h | 186 ++++++++++++++++++++++++++++++++++++++++++ kernel/yosys.cc | 2 +- misc/create_vcxsrc.sh | 1 + 7 files changed, 254 insertions(+), 2 deletions(-) create mode 100644 kernel/threading.cc create mode 100644 kernel/threading.h diff --git a/Makefile b/Makefile index 5b362a652..ed6eb3bf3 100644 --- a/Makefile +++ b/Makefile @@ -44,7 +44,12 @@ LINK_ABC := 0 # Needed for environments that can't run executables (i.e. emscripten, wasm) DISABLE_SPAWN := 0 # Needed for environments that don't have proper thread support (i.e. emscripten, wasm--for now) +ENABLE_THREADS := 1 +ifeq ($(ENABLE_THREADS),1) DISABLE_ABC_THREADS := 0 +else +DISABLE_ABC_THREADS := 1 +endif # clang sanitizers SANITIZER = @@ -300,6 +305,7 @@ DISABLE_SPAWN := 1 ifeq ($(ENABLE_ABC),1) LINK_ABC := 1 +ENABLE_THREADS := 0 DISABLE_ABC_THREADS := 1 endif @@ -457,6 +463,11 @@ CXXFLAGS := -Og -DDEBUG $(filter-out $(OPT_LEVEL),$(CXXFLAGS)) STRIP := endif +ifeq ($(ENABLE_THREADS),1) +CXXFLAGS += -DYOSYS_ENABLE_THREADS +LIBS += -lpthread +endif + ifeq ($(ENABLE_ABC),1) CXXFLAGS += -DYOSYS_ENABLE_ABC ifeq ($(LINK_ABC),1) @@ -612,6 +623,7 @@ $(eval $(call add_include_file,kernel/satgen.h)) $(eval $(call add_include_file,kernel/scopeinfo.h)) $(eval $(call add_include_file,kernel/sexpr.h)) $(eval $(call add_include_file,kernel/sigtools.h)) +$(eval $(call add_include_file,kernel/threading.h)) $(eval $(call add_include_file,kernel/timinginfo.h)) $(eval $(call add_include_file,kernel/utils.h)) $(eval $(call add_include_file,kernel/yosys.h)) @@ -635,7 +647,7 @@ OBJS += kernel/driver.o kernel/register.o kernel/rtlil.o kernel/log.o kernel/cal OBJS += kernel/log_help.o OBJS += kernel/binding.o kernel/tclapi.o OBJS += kernel/cellaigs.o kernel/celledges.o kernel/cost.o kernel/satgen.o kernel/scopeinfo.o kernel/qcsat.o kernel/mem.o kernel/ffmerge.o kernel/ff.o kernel/yw.o kernel/json.o kernel/fmt.o kernel/sexpr.o -OBJS += kernel/drivertools.o kernel/functional.o +OBJS += kernel/drivertools.o kernel/functional.o kernel/threading.o ifeq ($(ENABLE_ZLIB),1) OBJS += kernel/fstdata.o endif diff --git a/kernel/log.cc b/kernel/log.cc index 011071439..17869f7f0 100644 --- a/kernel/log.cc +++ b/kernel/log.cc @@ -639,6 +639,13 @@ void log_dump_val_worker(RTLIL::State v) { log("%s", log_signal(v)); } +std::string signal_str(const RTLIL::SigSpec &sig, bool autoint) +{ + std::stringstream buf; + RTLIL_BACKEND::dump_sigspec(buf, sig, autoint); + return buf.str(); +} + const char *log_signal(const RTLIL::SigSpec &sig, bool autoint) { std::stringstream buf; diff --git a/kernel/log.h b/kernel/log.h index 48997d250..41973384c 100644 --- a/kernel/log.h +++ b/kernel/log.h @@ -204,6 +204,7 @@ extern dict log_expect_log, log_expect_warning, lo extern dict log_expect_prefix_log, log_expect_prefix_warning, log_expect_prefix_error; void log_check_expected(); +std::string signal_str(const RTLIL::SigSpec &sig, bool autoint = true); const char *log_signal(const RTLIL::SigSpec &sig, bool autoint = true); const char *log_const(const RTLIL::Const &value, bool autoint = true); const char *log_id(const RTLIL::IdString &id); diff --git a/kernel/threading.cc b/kernel/threading.cc new file mode 100644 index 000000000..49fddaa7c --- /dev/null +++ b/kernel/threading.cc @@ -0,0 +1,45 @@ +#include "kernel/yosys_common.h" +#include "kernel/threading.h" + +YOSYS_NAMESPACE_BEGIN + +void DeferredLogs::flush() +{ + for (auto &m : logs) + if (m.error) + YOSYS_NAMESPACE_PREFIX log_error("%s", m.text.c_str()); + else + YOSYS_NAMESPACE_PREFIX log("%s", m.text.c_str()); +} + +int ThreadPool::pool_size(int reserved_cores, int max_threads) +{ +#ifdef YOSYS_ENABLE_THREADS + int num_threads = std::min(std::thread::hardware_concurrency() - reserved_cores, max_threads); + return std::max(0, num_threads); +#else + return 0; +#endif +} + +ThreadPool::ThreadPool(int pool_size, std::function b) + : body(std::move(b)) +{ +#ifdef YOSYS_ENABLE_THREADS + threads.reserve(pool_size); + for (int i = 0; i < pool_size; i++) + threads.emplace_back([i, this]{ body(i); }); +#else + log_assert(pool_size == 0); +#endif +} + +ThreadPool::~ThreadPool() +{ +#ifdef YOSYS_ENABLE_THREADS + for (auto &t : threads) + t.join(); +#endif +} + +YOSYS_NAMESPACE_END diff --git a/kernel/threading.h b/kernel/threading.h new file mode 100644 index 000000000..c34abf850 --- /dev/null +++ b/kernel/threading.h @@ -0,0 +1,186 @@ +#include + +#ifdef YOSYS_ENABLE_THREADS +#include +#include +#include +#endif + +#include "kernel/yosys_common.h" +#include "kernel/log.h" + +#ifndef YOSYS_THREADING_H +#define YOSYS_THREADING_H + +YOSYS_NAMESPACE_BEGIN + +// Concurrent queue implementation. Not fast, but simple. +// Multi-producer, multi-consumer, optionally bounded. +// When YOSYS_ENABLE_THREADS is not defined, this is just a non-thread-safe non-blocking deque. +template +class ConcurrentQueue +{ +public: + ConcurrentQueue(int capacity = INT_MAX) + : capacity(capacity) {} + // Push an element into the queue. If it's at capacity, block until there is room. + void push_back(T t) + { +#ifdef YOSYS_ENABLE_THREADS + std::unique_lock lock(mutex); + not_full_condition.wait(lock, [this] { return static_cast(contents.size()) < capacity; }); + if (contents.empty()) + not_empty_condition.notify_one(); +#endif + log_assert(!closed); + contents.push_back(std::move(t)); +#ifdef YOSYS_ENABLE_THREADS + if (static_cast(contents.size()) < capacity) + not_full_condition.notify_one(); +#endif + } + // Signal that no more elements will be produced. `pop_front()` will return nullopt. + void close() + { +#ifdef YOSYS_ENABLE_THREADS + std::unique_lock lock(mutex); + not_empty_condition.notify_all(); +#endif + closed = true; + } + // Pop an element from the queue. Blocks until an element is available + // or the queue is closed and empty. + std::optional pop_front() + { + return pop_front_internal(true); + } + // Pop an element from the queue. Does not block, just returns nullopt if the + // queue is empty. + std::optional try_pop_front() + { + return pop_front_internal(false); + } +private: +#ifdef YOSYS_ENABLE_THREADS + std::optional pop_front_internal(bool wait) + { + std::unique_lock lock(mutex); + if (wait) { + not_empty_condition.wait(lock, [this] { return !contents.empty() || closed; }); + } +#else + std::optional pop_front_internal(bool) + { +#endif + if (contents.empty()) + return std::nullopt; +#ifdef YOSYS_ENABLE_THREADS + if (static_cast(contents.size()) == capacity) + not_full_condition.notify_one(); +#endif + T result = std::move(contents.front()); + contents.pop_front(); +#ifdef YOSYS_ENABLE_THREADS + if (!contents.empty()) + not_empty_condition.notify_one(); +#endif + return std::move(result); + } + +#ifdef YOSYS_ENABLE_THREADS + std::mutex mutex; + // Signals one waiter thread when the queue changes and is not full. + std::condition_variable not_full_condition; + // Signals one waiter thread when the queue changes and is not empty. + std::condition_variable not_empty_condition; +#endif + std::deque contents; + int capacity; + bool closed = false; +}; + +class DeferredLogs +{ +public: + template + void log(FmtString...> fmt, Args... args) + { + logs.push_back({fmt.format(args...), false}); + } + template + void log_error(FmtString...> fmt, Args... args) + { + logs.push_back({fmt.format(args...), true}); + } + void flush(); +private: + struct Message + { + std::string text; + bool error; + }; + std::vector logs; +}; + +class ThreadPool +{ +public: + // Computes the number of worker threads to use. + // `reserved_cores` cores are set aside for other threads (e.g. work on the main thread). + // `max_threads` --- don't return more workers than this. + // The result may be 0. + static int pool_size(int reserved_cores, int max_threads); + + // Create a pool of threads running the given closure (parameterized by thread number). + // `pool_size` must be the result of a `pool_size()` call. + ThreadPool(int pool_size, std::function b); + ThreadPool(ThreadPool &&other) = delete; + // Waits for all threads to terminate. Make sure those closures return! + ~ThreadPool(); + + // Return the number of threads in the pool. + int num_threads() const + { +#ifdef YOSYS_ENABLE_THREADS + return threads.size(); +#else + return 0; +#endif + } +private: + std::function body; +#ifdef YOSYS_ENABLE_THREADS + std::vector threads; +#endif +}; + +template +class ConcurrentStack +{ +public: + void push_back(T &&t) { +#ifdef YOSYS_ENABLE_THREADS + std::lock_guard lock(mutex); +#endif + contents.push_back(std::move(t)); + } + std::optional try_pop_back() { +#ifdef YOSYS_ENABLE_THREADS + std::lock_guard lock(mutex); +#endif + if (contents.empty()) + return std::nullopt; + T result = std::move(contents.back()); + contents.pop_back(); + return result; + } +private: +#ifdef YOSYS_ENABLE_THREADS + std::mutex mutex; +#endif + std::vector contents; +}; + +YOSYS_NAMESPACE_END + +#endif // YOSYS_THREADING_H diff --git a/kernel/yosys.cc b/kernel/yosys.cc index 196b78186..7856a8214 100644 --- a/kernel/yosys.cc +++ b/kernel/yosys.cc @@ -177,7 +177,7 @@ int run_command(const std::string &command, std::function "$vcxsrc"/YosysVS/YosysVS.vcxproj.new sed -i 's,,\n stdcpp17\n /Zc:__cplusplus %(AdditionalOptions),g' "$vcxsrc"/YosysVS/YosysVS.vcxproj.new +sed -i 's,,YOSYS_ENABLE_THREADS;,g' "$vcxsrc"/YosysVS/YosysVS.vcxproj.new if [ -f "/usr/include/FlexLexer.h" ] ; then sed -i 's,,;..\\yosys\\libs\\flex,g' "$vcxsrc"/YosysVS/YosysVS.vcxproj.new fi