3
0
Fork 0
mirror of https://github.com/YosysHQ/yosys synced 2026-02-14 04:41:48 +00:00
This commit is contained in:
Robert O'Callahan 2026-02-13 01:11:30 -05:00 committed by GitHub
commit c742785b65
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 2115 additions and 562 deletions

View file

@ -58,7 +58,7 @@ runs:
if: runner.os == 'Linux' && inputs.get-test-deps == 'true'
uses: awalsh128/cache-apt-pkgs-action@v1.6.0
with:
packages: libgtest-dev
packages: libgtest-dev libgmock-dev
version: ${{ inputs.runs-on }}-testys
- name: Install macOS Dependencies

View file

@ -286,6 +286,7 @@ struct RTLILFrontendWorker {
if (width > MAX_CONST_WIDTH)
error("Constant width %lld out of range before `%s`.", width, error_token());
bits.reserve(width);
int start_idx = idx;
while (true) {
RTLIL::State bit;
switch (line[idx]) {
@ -300,8 +301,9 @@ struct RTLILFrontendWorker {
bits.push_back(bit);
++idx;
}
done:
std::reverse(bits.begin(), bits.end());
done:
if (start_idx < idx)
std::reverse(bits.begin(), bits.end());
if (GetSize(bits) > width)
bits.resize(width);

View file

@ -22,6 +22,7 @@
#include "kernel/yosys.h"
#include "kernel/sigtools.h"
#include "kernel/threading.h"
YOSYS_NAMESPACE_BEGIN
@ -35,34 +36,55 @@ struct FfInitVals
sigmap = sigmap_;
initbits.clear();
for (auto wire : module->wires())
if (wire->attributes.count(ID::init))
process_wire(wire);
}
void process_wire(RTLIL::Wire *wire)
{
SigSpec wirebits = (*sigmap)(wire);
Const initval = wire->attributes.at(ID::init);
for (int i = 0; i < GetSize(wirebits) && i < GetSize(initval); i++)
{
if (wire->attributes.count(ID::init) == 0)
SigBit bit = wirebits[i];
State val = initval[i];
if (val != State::S0 && val != State::S1 && bit.wire != nullptr)
continue;
SigSpec wirebits = (*sigmap)(wire);
Const initval = wire->attributes.at(ID::init);
for (int i = 0; i < GetSize(wirebits) && i < GetSize(initval); i++)
{
SigBit bit = wirebits[i];
State val = initval[i];
if (val != State::S0 && val != State::S1 && bit.wire != nullptr)
continue;
if (initbits.count(bit)) {
if (initbits.at(bit).first != val)
log_error("Conflicting init values for signal %s (%s = %s != %s).\n",
log_signal(bit), log_signal(SigBit(wire, i)),
log_signal(val), log_signal(initbits.at(bit).first));
continue;
}
initbits[bit] = std::make_pair(val,SigBit(wire,i));
if (initbits.count(bit)) {
if (initbits.at(bit).first != val)
log_error("Conflicting init values for signal %s (%s = %s != %s).\n",
log_signal(bit), log_signal(SigBit(wire, i)),
log_signal(val), log_signal(initbits.at(bit).first));
continue;
}
initbits[bit] = std::make_pair(val,SigBit(wire,i));
}
}
void set_parallel(const SigMapView *sigmap_, ParallelDispatchThreadPool &thread_pool, RTLIL::Module *module)
{
sigmap = sigmap_;
initbits.clear();
const RTLIL::Module *const_module = module;
ParallelDispatchThreadPool::Subpool subpool(thread_pool, ThreadPool::work_pool_size(0, module->wires_size(), 1000));
ShardedVector<RTLIL::Wire*> init_wires(subpool);
subpool.run([const_module, &init_wires](const ParallelDispatchThreadPool::RunCtx &ctx) {
for (int i : ctx.item_range(const_module->wires_size())) {
RTLIL::Wire *wire = const_module->wire_at(i);
if (wire->attributes.count(ID::init))
init_wires.insert(ctx, wire);
}
});
for (RTLIL::Wire *wire : init_wires)
process_wire(wire);
}
RTLIL::State operator()(RTLIL::SigBit bit) const
{
auto it = initbits.find((*sigmap)(bit));

View file

@ -324,6 +324,14 @@ void log_formatted_file_info(std::string_view filename, int lineno, std::string
log("%s:%d: Info: %s", filename, lineno, str);
}
void log_suppressed() {
if (log_debug_suppressed && !log_make_debug) {
constexpr const char* format = "<suppressed ~%d debug messages>\n";
logv_string(format, stringf(format, log_debug_suppressed));
log_debug_suppressed = 0;
}
}
[[noreturn]]
static void log_error_with_prefix(std::string_view prefix, std::string str)
{
@ -345,7 +353,9 @@ static void log_error_with_prefix(std::string_view prefix, std::string str)
}
log_last_error = std::move(str);
log("%s%s", prefix, log_last_error);
std::string message(prefix);
message += log_last_error;
logv_string("%s%s", message);
log_flush();
log_make_debug = bak_log_make_debug;
@ -355,7 +365,7 @@ static void log_error_with_prefix(std::string_view prefix, std::string str)
item.current_count++;
for (auto &[_, item] : log_expect_prefix_error)
if (std::regex_search(string(prefix) + string(log_last_error), item.pattern))
if (std::regex_search(message, item.pattern))
item.current_count++;
log_check_expected();

View file

@ -206,12 +206,7 @@ template <typename... Args>
log_formatted_cmd_error(fmt.format(args...));
}
static inline void log_suppressed() {
if (log_debug_suppressed && !log_make_debug) {
log("<suppressed ~%d debug messages>\n", log_debug_suppressed);
log_debug_suppressed = 0;
}
}
void log_suppressed();
struct LogMakeDebugHdl {
bool status = false;

View file

@ -22,6 +22,7 @@
#include "kernel/celltypes.h"
#include "kernel/binding.h"
#include "kernel/sigtools.h"
#include "kernel/threading.h"
#include "frontends/verilog/verilog_frontend.h"
#include "frontends/verilog/preproc.h"
#include "backends/rtlil/rtlil_backend.h"
@ -142,9 +143,17 @@ static constexpr bool check_well_known_id_order()
// and in sorted ascii order, as required by the ID macro.
static_assert(check_well_known_id_order());
constexpr int STATIC_ID_END = static_cast<int>(RTLIL::StaticId::STATIC_ID_END);
struct IdStringCollector {
IdStringCollector(std::vector<MonotonicFlag> &live_ids)
: live_ids(live_ids) {}
void trace(IdString id) {
live.insert(id.index_);
if (id.index_ >= STATIC_ID_END)
live_ids[id.index_ - STATIC_ID_END].set();
else if (id.index_ < 0)
live_autoidx_ids.push_back(id.index_);
}
template <typename T> void trace(const T* v) {
trace(*v);
@ -178,10 +187,6 @@ struct IdStringCollector {
trace(element);
}
void trace(const RTLIL::Design &design) {
trace_values(design.modules_);
trace(design.selection_vars);
}
void trace(const RTLIL::Selection &selection_var) {
trace(selection_var.selected_modules);
trace(selection_var.selected_members);
@ -190,15 +195,6 @@ struct IdStringCollector {
trace_keys(named.attributes);
trace(named.name);
}
void trace(const RTLIL::Module &module) {
trace_named(module);
trace_values(module.wires_);
trace_values(module.cells_);
trace(module.avail_parameters);
trace_keys(module.parameter_default_values);
trace_values(module.memories);
trace_values(module.processes);
}
void trace(const RTLIL::Wire &wire) {
trace_named(wire);
if (wire.known_driver())
@ -234,7 +230,8 @@ struct IdStringCollector {
trace(action.memid);
}
std::unordered_set<int> live;
std::vector<MonotonicFlag> &live_ids;
std::vector<int> live_autoidx_ids;
};
int64_t RTLIL::OwningIdString::gc_ns;
@ -243,20 +240,55 @@ int RTLIL::OwningIdString::gc_count;
void RTLIL::OwningIdString::collect_garbage()
{
int64_t start = PerformanceTimer::query();
IdStringCollector collector;
for (auto &[idx, design] : *RTLIL::Design::get_all_designs()) {
collector.trace(*design);
}
int size = GetSize(global_id_storage_);
for (int i = static_cast<int>(StaticId::STATIC_ID_END); i < size; ++i) {
RTLIL::IdString::Storage &storage = global_id_storage_.at(i);
if (storage.buf == nullptr)
continue;
if (collector.live.find(i) != collector.live.end())
continue;
if (global_refcount_storage_.find(i) != global_refcount_storage_.end())
continue;
int pool_size = 0;
for (auto &[idx, design] : *RTLIL::Design::get_all_designs())
for (RTLIL::Module *module : design->modules())
pool_size = std::max(pool_size, ThreadPool::work_pool_size(0, module->cells_size(), 1000));
ParallelDispatchThreadPool thread_pool(pool_size);
int size = GetSize(global_id_storage_);
std::vector<MonotonicFlag> live_ids(size - STATIC_ID_END);
std::vector<IdStringCollector> collectors;
int num_threads = thread_pool.num_threads();
collectors.reserve(num_threads);
for (int i = 0; i < num_threads; ++i)
collectors.emplace_back(live_ids);
for (auto &[idx, design] : *RTLIL::Design::get_all_designs()) {
for (RTLIL::Module *module : design->modules()) {
collectors[0].trace_named(*module);
ParallelDispatchThreadPool::Subpool subpool(thread_pool, ThreadPool::work_pool_size(0, module->cells_size(), 1000));
subpool.run([&collectors, module](const ParallelDispatchThreadPool::RunCtx &ctx) {
for (int i : ctx.item_range(module->cells_size()))
collectors[ctx.thread_num].trace(module->cell_at(i));
for (int i : ctx.item_range(module->wires_size()))
collectors[ctx.thread_num].trace(module->wire_at(i));
});
collectors[0].trace(module->avail_parameters);
collectors[0].trace_keys(module->parameter_default_values);
collectors[0].trace_values(module->memories);
collectors[0].trace_values(module->processes);
}
collectors[0].trace(design->selection_vars);
}
ShardedVector<int> free_ids(thread_pool);
thread_pool.run([&live_ids, size, &free_ids](const ParallelDispatchThreadPool::RunCtx &ctx) {
for (int i : ctx.item_range(size - STATIC_ID_END)) {
int index = i + STATIC_ID_END;
RTLIL::IdString::Storage &storage = global_id_storage_.at(index);
if (storage.buf == nullptr)
continue;
if (live_ids[i].load())
continue;
if (global_refcount_storage_.find(index) != global_refcount_storage_.end())
continue;
free_ids.insert(ctx, index);
}
});
for (int i : free_ids) {
RTLIL::IdString::Storage &storage = global_id_storage_.at(i);
if (yosys_xtrace) {
log("#X# Removed IdString '%s' with index %d.\n", storage.buf, i);
log_backtrace("-X- ", yosys_xtrace-1);
@ -268,8 +300,13 @@ void RTLIL::OwningIdString::collect_garbage()
global_free_idx_list_.push_back(i);
}
std::unordered_set<int> live_autoidx_ids;
for (IdStringCollector &collector : collectors)
for (int id : collector.live_autoidx_ids)
live_autoidx_ids.insert(id);
for (auto it = global_autoidx_id_storage_.begin(); it != global_autoidx_id_storage_.end();) {
if (collector.live.find(it->first) != collector.live.end()) {
if (live_autoidx_ids.find(it->first) != live_autoidx_ids.end()) {
++it;
continue;
}
@ -1466,15 +1503,21 @@ void RTLIL::Design::sort_modules()
modules_.sort(sort_by_id_str());
}
void check_module(RTLIL::Module *module, ParallelDispatchThreadPool &thread_pool);
void RTLIL::Design::check()
{
#ifndef NDEBUG
log_assert(!selection_stack.empty());
int pool_size = 0;
for (auto &it : modules_)
pool_size = std::max(pool_size, ThreadPool::work_pool_size(0, it.second->cells_size(), 1000));
ParallelDispatchThreadPool thread_pool(pool_size);
for (auto &it : modules_) {
log_assert(this == it.second->design);
log_assert(it.first == it.second->name);
log_assert(!it.first.empty());
it.second->check();
check_module(it.second, thread_pool);
}
#endif
}
@ -1710,11 +1753,11 @@ size_t RTLIL::Module::count_id(RTLIL::IdString id)
namespace {
struct InternalCellChecker
{
RTLIL::Module *module;
const RTLIL::Module *module;
RTLIL::Cell *cell;
pool<RTLIL::IdString> expected_params, expected_ports;
InternalCellChecker(RTLIL::Module *module, RTLIL::Cell *cell) : module(module), cell(cell) { }
InternalCellChecker(const RTLIL::Module *module, RTLIL::Cell *cell) : module(module), cell(cell) { }
void error(int linenr)
{
@ -2690,88 +2733,96 @@ void RTLIL::Module::sort()
it.second->attributes.sort(sort_by_id_str());
}
void RTLIL::Module::check()
void check_module(RTLIL::Module *module, ParallelDispatchThreadPool &thread_pool)
{
#ifndef NDEBUG
std::vector<bool> ports_declared;
for (auto &it : wires_) {
log_assert(this == it.second->module);
log_assert(it.first == it.second->name);
log_assert(!it.first.empty());
log_assert(it.second->width >= 0);
log_assert(it.second->port_id >= 0);
for (auto &it2 : it.second->attributes)
log_assert(!it2.first.empty());
if (it.second->port_id) {
log_assert(GetSize(ports) >= it.second->port_id);
log_assert(ports.at(it.second->port_id-1) == it.first);
log_assert(it.second->port_input || it.second->port_output);
if (GetSize(ports_declared) < it.second->port_id)
ports_declared.resize(it.second->port_id);
log_assert(ports_declared[it.second->port_id-1] == false);
ports_declared[it.second->port_id-1] = true;
} else
log_assert(!it.second->port_input && !it.second->port_output);
}
for (auto port_declared : ports_declared)
log_assert(port_declared == true);
log_assert(GetSize(ports) == GetSize(ports_declared));
ParallelDispatchThreadPool::Subpool subpool(thread_pool, ThreadPool::work_pool_size(0, module->cells_size(), 1000));
const RTLIL::Module *const_module = module;
for (auto &it : memories) {
pool<std::string> memory_strings;
for (auto &it : module->memories) {
log_assert(it.first == it.second->name);
log_assert(!it.first.empty());
log_assert(it.second->width >= 0);
log_assert(it.second->size >= 0);
for (auto &it2 : it.second->attributes)
log_assert(!it2.first.empty());
memory_strings.insert(it.second->name.str());
}
pool<IdString> packed_memids;
std::vector<MonotonicFlag> ports_declared(GetSize(module->ports));
ShardedVector<std::string> memids(subpool);
subpool.run([const_module, &ports_declared, &memory_strings, &memids](const ParallelDispatchThreadPool::RunCtx &ctx) {
for (int i : ctx.item_range(const_module->cells_size())) {
auto it = *const_module->cells_.element(i);
log_assert(const_module == it.second->module);
log_assert(it.first == it.second->name);
log_assert(!it.first.empty());
log_assert(!it.second->type.empty());
for (auto &it2 : it.second->connections()) {
log_assert(!it2.first.empty());
it2.second.check(const_module);
}
for (auto &it2 : it.second->attributes)
log_assert(!it2.first.empty());
for (auto &it2 : it.second->parameters)
log_assert(!it2.first.empty());
InternalCellChecker checker(const_module, it.second);
checker.check();
if (it.second->has_memid()) {
log_assert(memory_strings.count(it.second->parameters.at(ID::MEMID).decode_string()));
} else if (it.second->is_mem_cell()) {
std::string memid = it.second->parameters.at(ID::MEMID).decode_string();
log_assert(!memory_strings.count(memid));
memids.insert(ctx, std::move(memid));
}
auto cell_mod = const_module->design->module(it.first);
if (cell_mod != nullptr) {
// assertion check below to make sure that there are no
// cases where a cell has a blackbox attribute since
// that is deprecated
#ifdef __GNUC__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
log_assert(!it.second->get_blackbox_attribute());
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif
}
}
for (auto &it : cells_) {
log_assert(this == it.second->module);
log_assert(it.first == it.second->name);
log_assert(!it.first.empty());
log_assert(!it.second->type.empty());
for (auto &it2 : it.second->connections()) {
log_assert(!it2.first.empty());
it2.second.check(this);
for (int i : ctx.item_range(const_module->wires_size())) {
auto it = *const_module->wires_.element(i);
log_assert(const_module == it.second->module);
log_assert(it.first == it.second->name);
log_assert(!it.first.empty());
log_assert(it.second->width >= 0);
log_assert(it.second->port_id >= 0);
for (auto &it2 : it.second->attributes)
log_assert(!it2.first.empty());
if (it.second->port_id) {
log_assert(GetSize(const_module->ports) >= it.second->port_id);
log_assert(const_module->ports.at(it.second->port_id-1) == it.first);
log_assert(it.second->port_input || it.second->port_output);
log_assert(it.second->port_id <= GetSize(ports_declared));
bool previously_declared = ports_declared[it.second->port_id-1].set_and_return_old();
log_assert(previously_declared == false);
} else
log_assert(!it.second->port_input && !it.second->port_output);
}
for (auto &it2 : it.second->attributes)
log_assert(!it2.first.empty());
for (auto &it2 : it.second->parameters)
log_assert(!it2.first.empty());
InternalCellChecker checker(this, it.second);
checker.check();
if (it.second->has_memid()) {
log_assert(memories.count(it.second->parameters.at(ID::MEMID).decode_string()));
} else if (it.second->is_mem_cell()) {
IdString memid = it.second->parameters.at(ID::MEMID).decode_string();
log_assert(!memories.count(memid));
log_assert(!packed_memids.count(memid));
packed_memids.insert(memid);
}
auto cell_mod = design->module(it.first);
if (cell_mod != nullptr) {
// assertion check below to make sure that there are no
// cases where a cell has a blackbox attribute since
// that is deprecated
#ifdef __GNUC__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
log_assert(!it.second->get_blackbox_attribute());
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif
}
}
});
for (const MonotonicFlag &port_declared : ports_declared)
log_assert(port_declared.load() == true);
pool<std::string> memids_pool;
for (std::string &memid : memids)
log_assert(memids_pool.insert(memid).second);
for (auto &it : processes) {
for (auto &it : module->processes) {
log_assert(it.first == it.second->name);
log_assert(!it.first.empty());
log_assert(it.second->root_case.compare.empty());
std::vector<CaseRule*> all_cases = {&it.second->root_case};
std::vector<RTLIL::CaseRule*> all_cases = {&it.second->root_case};
for (size_t i = 0; i < all_cases.size(); i++) {
for (auto &switch_it : all_cases[i]->switches) {
for (auto &case_it : switch_it->cases) {
@ -2784,34 +2835,41 @@ void RTLIL::Module::check()
}
for (auto &sync_it : it.second->syncs) {
switch (sync_it->type) {
case SyncType::ST0:
case SyncType::ST1:
case SyncType::STp:
case SyncType::STn:
case SyncType::STe:
case RTLIL::SyncType::ST0:
case RTLIL::SyncType::ST1:
case RTLIL::SyncType::STp:
case RTLIL::SyncType::STn:
case RTLIL::SyncType::STe:
log_assert(!sync_it->signal.empty());
break;
case SyncType::STa:
case SyncType::STg:
case SyncType::STi:
case RTLIL::SyncType::STa:
case RTLIL::SyncType::STg:
case RTLIL::SyncType::STi:
log_assert(sync_it->signal.empty());
break;
}
}
}
for (auto &it : connections_) {
for (auto &it : module->connections_) {
log_assert(it.first.size() == it.second.size());
log_assert(!it.first.has_const());
it.first.check(this);
it.second.check(this);
it.first.check(module);
it.second.check(module);
}
for (auto &it : attributes)
for (auto &it : module->attributes)
log_assert(!it.first.empty());
#endif
}
void RTLIL::Module::check()
{
int pool_size = ThreadPool::work_pool_size(0, cells_size(), 1000);
ParallelDispatchThreadPool thread_pool(pool_size);
check_module(this, thread_pool);
}
void RTLIL::Module::optimize()
{
}
@ -5470,7 +5528,7 @@ RTLIL::SigSpec RTLIL::SigSpec::repeat(int num) const
}
#ifndef NDEBUG
void RTLIL::SigSpec::check(Module *mod) const
void RTLIL::SigSpec::check(const Module *mod) const
{
if (rep_ == CHUNK)
{

View file

@ -275,6 +275,17 @@ struct RTLIL::IdString
*out += std::to_string(-index_);
}
std::string unescape() const {
if (index_ < 0) {
// Must start with "$auto$" so no unescaping required.
return str();
}
std::string_view str = global_id_storage_.at(index_).str_view();
if (str.size() < 2 || str[0] != '\\' || str[1] == '$' || str[1] == '\\' || (str[1] >= '0' && str[1] <= '9'))
return std::string(str);
return std::string(str.substr(1));
}
class Substrings {
std::string_view first_;
int suffix_number;
@ -758,7 +769,7 @@ namespace RTLIL {
}
static inline std::string unescape_id(RTLIL::IdString str) {
return unescape_id(str.str());
return str.unescape();
}
static inline const char *id2cstr(RTLIL::IdString str) {
@ -1748,9 +1759,9 @@ public:
}
#ifndef NDEBUG
void check(Module *mod = nullptr) const;
void check(const Module *mod = nullptr) const;
#else
void check(Module *mod = nullptr) const { (void)mod; }
void check(const Module *mod = nullptr) const { (void)mod; }
#endif
};

View file

@ -17,6 +17,20 @@ static int get_max_threads()
return max_threads;
}
static int init_work_units_per_thread_override()
{
const char *v = getenv("YOSYS_WORK_UNITS_PER_THREAD");
if (v == nullptr)
return 0;
return atoi(v);
}
static int get_work_units_per_thread_override()
{
static int work_units_per_thread = init_work_units_per_thread_override();
return work_units_per_thread;
}
void DeferredLogs::flush()
{
for (auto &m : logs)
@ -37,6 +51,14 @@ int ThreadPool::pool_size(int reserved_cores, int max_worker_threads)
#endif
}
int ThreadPool::work_pool_size(int reserved_cores, int work_units, int work_units_per_thread)
{
int work_units_per_thread_override = get_work_units_per_thread_override();
if (work_units_per_thread_override > 0)
work_units_per_thread = work_units_per_thread_override;
return pool_size(reserved_cores, work_units / work_units_per_thread);
}
ThreadPool::ThreadPool(int pool_size, std::function<void(int)> b)
: body(std::move(b))
{
@ -57,4 +79,72 @@ ThreadPool::~ThreadPool()
#endif
}
IntRange item_range_for_worker(int num_items, int thread_num, int num_threads)
{
if (num_threads <= 1) {
return {0, num_items};
}
int items_per_thread = num_items / num_threads;
int extra_items = num_items % num_threads;
// The first `extra_items` threads get one extra item.
int start = thread_num * items_per_thread + std::min(thread_num, extra_items);
int end = (thread_num + 1) * items_per_thread + std::min(thread_num + 1, extra_items);
return {start, end};
}
ParallelDispatchThreadPool::ParallelDispatchThreadPool(int pool_size)
: num_worker_threads_(std::max(1, pool_size) - 1)
{
#ifdef YOSYS_ENABLE_THREADS
main_to_workers_signal.resize(num_worker_threads_, 0);
#endif
// Don't start the threads until we've constructed all our data members.
thread_pool = std::make_unique<ThreadPool>(num_worker_threads_, [this](int thread_num){
run_worker(thread_num);
});
}
ParallelDispatchThreadPool::~ParallelDispatchThreadPool()
{
#ifdef YOSYS_ENABLE_THREADS
if (num_worker_threads_ == 0)
return;
current_work = nullptr;
num_active_worker_threads_ = num_worker_threads_;
signal_workers_start();
wait_for_workers_done();
#endif
}
void ParallelDispatchThreadPool::run(std::function<void(const RunCtx &)> work, int max_threads)
{
Multithreading multithreading;
num_active_worker_threads_ = num_threads(max_threads) - 1;
if (num_active_worker_threads_ == 0) {
work({{0}, 1});
return;
}
#ifdef YOSYS_ENABLE_THREADS
current_work = &work;
signal_workers_start();
work({{0}, num_active_worker_threads_ + 1});
wait_for_workers_done();
#endif
}
void ParallelDispatchThreadPool::run_worker(int thread_num)
{
#ifdef YOSYS_ENABLE_THREADS
while (true)
{
worker_wait_for_start(thread_num);
if (current_work == nullptr)
break;
(*current_work)({{thread_num + 1}, num_active_worker_threads_ + 1});
signal_worker_done();
}
signal_worker_done();
#endif
}
YOSYS_NAMESPACE_END

View file

@ -131,6 +131,11 @@ public:
// The result may be 0.
static int pool_size(int reserved_cores, int max_worker_threads);
// Computes the number of worker threads to use, by dividing work_units among threads.
// For testing purposes you can set YOSYS_WORK_UNITS_PER_THREAD to override `work_units_per_thread`.
// The result may be 0.
static int work_pool_size(int reserved_cores, int work_units, int work_units_per_thread);
// Create a pool of threads running the given closure (parameterized by thread number).
// `pool_size` must be the result of a `pool_size()` call.
ThreadPool(int pool_size, std::function<void(int)> b);
@ -154,6 +159,140 @@ private:
#endif
};
// A range of integers [start_, end_) that can be iterated over with a
// C++ range-based for loop.
struct IntRange {
int start_;
int end_;
struct Int {
int v;
int operator*() const { return v; }
Int &operator++() { ++v; return *this; }
bool operator!=(const Int &other) const { return v != other.v; }
};
Int begin() const { return {start_}; }
Int end() const { return {end_}; }
bool operator==(const IntRange &other) const { return start_ == other.start_ && end_ == other.end_; }
bool operator!=(const IntRange &other) const { return !(*this == other); }
};
// Divides some number of items into `num_threads` subranges and returns the
// `thread_num`'th subrange. If `num_threads` is zero, returns the whole range.
IntRange item_range_for_worker(int num_items, int thread_num, int num_threads);
// A type that encapsulates the index of a thread in some list of threads. Useful for
// stronger typechecking and code readability.
struct ThreadIndex {
int thread_num;
};
// A set of threads with a `run()` API that runs a closure on all of the threads
// and wait for all those closures to complete. This is a convenient way to implement
// parallel algorithms that use barrier synchronization.
class ParallelDispatchThreadPool
{
public:
// Create a pool of threads running the given closure (parameterized by thread number).
// `pool_size` must be the result of a `pool_size()` call.
// `pool_size` can be zero, which we treat as 1.
ParallelDispatchThreadPool(int pool_size);
~ParallelDispatchThreadPool();
// For each thread running a closure, a `RunCtx` is passed to the closure. Currently
// it contains the thread index and the total number of threads. It can be passed
// directly to any APIs requiring a `ThreadIndex`.
struct RunCtx : public ThreadIndex {
int num_threads;
IntRange item_range(int num_items) const {
return item_range_for_worker(num_items, thread_num, num_threads);
}
};
// Sometimes we only want to activate a subset of the threads in the pool. This
// class provides a way to do that. It provides the same `num_threads()`
// and `run()` APIs as a `ParallelDispatchThreadPool`.
class Subpool {
public:
Subpool(ParallelDispatchThreadPool &parent, int max_threads)
: parent(parent), max_threads(max_threads) {}
// Returns the number of threads that will be used when calling `run()`.
int num_threads() const {
return parent.num_threads(max_threads);
}
void run(std::function<void(const RunCtx &)> work) {
parent.run(std::move(work), max_threads);
}
ParallelDispatchThreadPool &thread_pool() { return parent; }
private:
ParallelDispatchThreadPool &parent;
int max_threads;
};
// Run the `work` function in parallel on each thread in the pool (parameterized by
// thread number). Waits for all work functions to complete. Only one `run()` can be
// active at a time.
// Uses no more than `max_threads` threads (but at least one).
void run(std::function<void(const RunCtx &)> work) {
run(std::move(work), INT_MAX);
}
// Returns the number of threads that will be used when calling `run()`.
int num_threads() const {
return num_threads(INT_MAX);
}
private:
friend class Subpool;
void run(std::function<void(const RunCtx &)> work, int max_threads);
int num_threads(int max_threads) const {
return std::min(num_worker_threads_ + 1, std::max(1, max_threads));
}
void run_worker(int thread_num);
std::unique_ptr<ThreadPool> thread_pool;
std::function<void(const RunCtx &)> *current_work = nullptr;
// Keeps a correct count even when threads are exiting.
int num_worker_threads_;
// The count of active workerthreads for the current `run()`.
int num_active_worker_threads_ = 0;
#ifdef YOSYS_ENABLE_THREADS
// Not especially efficient for large numbers of threads. Worker wakeup could scale
// better by conceptually organising workers into a tree and having workers wake
// up their children.
std::mutex main_to_workers_signal_mutex;
std::condition_variable main_to_workers_signal_cv;
std::vector<uint8_t> main_to_workers_signal;
void signal_workers_start() {
std::unique_lock lock(main_to_workers_signal_mutex);
std::fill(main_to_workers_signal.begin(), main_to_workers_signal.begin() + num_active_worker_threads_, 1);
// When `num_active_worker_threads_` is small compared to `num_worker_threads_`, we have a "thundering herd"
// problem here. Fixing that would add complexity so don't worry about it for now.
main_to_workers_signal_cv.notify_all();
}
void worker_wait_for_start(int thread_num) {
std::unique_lock lock(main_to_workers_signal_mutex);
main_to_workers_signal_cv.wait(lock, [this, thread_num] { return main_to_workers_signal[thread_num] > 0; });
main_to_workers_signal[thread_num] = 0;
}
std::atomic<int> done_workers = 0;
std::mutex workers_to_main_signal_mutex;
std::condition_variable workers_to_main_signal_cv;
void signal_worker_done() {
int d = done_workers.fetch_add(1, std::memory_order_release);
if (d + 1 == num_active_worker_threads_) {
std::unique_lock lock(workers_to_main_signal_mutex);
workers_to_main_signal_cv.notify_all();
}
}
void wait_for_workers_done() {
std::unique_lock lock(workers_to_main_signal_mutex);
workers_to_main_signal_cv.wait(lock, [this] { return done_workers.load(std::memory_order_acquire) == num_active_worker_threads_; });
done_workers.store(0, std::memory_order_relaxed);
}
#endif
};
template <class T>
class ConcurrentStack
{
@ -181,6 +320,373 @@ private:
std::vector<T> contents;
};
// A vector that is sharded into buckets, one per thread. This lets multiple threads write
// efficiently to the vector without synchronization overhead. After all writers have
// finished writing, the vector can be iterated over. The iteration order is deterministic:
// all the elements written by thread 0 in the order it inserted them, followed by all elements
// written by thread 1, etc.
template <typename T>
class ShardedVector {
public:
ShardedVector(const ParallelDispatchThreadPool &thread_pool) {
init(thread_pool.num_threads());
}
ShardedVector(const ParallelDispatchThreadPool::Subpool &thread_pool) {
init(thread_pool.num_threads());
}
// Insert a value, passing the `ThreadIndex` of the writer thread.
// Parallel inserts with different `ThreadIndex` values are fine.
// Inserts must not run concurrently with any other methods (e.g.
// iteration or `empty()`.)
void insert(const ThreadIndex &thread, T value) {
buckets[thread.thread_num].emplace_back(std::move(value));
}
bool empty() const {
for (const std::vector<T> &bucket : buckets)
if (!bucket.empty())
return false;
return true;
}
using Buckets = std::vector<std::vector<T>>;
class iterator {
public:
iterator(typename Buckets::iterator bucket_it, typename Buckets::iterator bucket_end)
: bucket_it(std::move(bucket_it)), bucket_end(std::move(bucket_end)) {
if (bucket_it != bucket_end)
inner_it = bucket_it->begin();
normalize();
}
T& operator*() const { return *inner_it.value(); }
iterator &operator++() {
++*inner_it;
normalize();
return *this;
}
bool operator!=(const iterator &other) const {
return bucket_it != other.bucket_it || inner_it != other.inner_it;
}
private:
void normalize() {
if (bucket_it == bucket_end)
return;
while (inner_it == bucket_it->end()) {
++bucket_it;
if (bucket_it == bucket_end) {
inner_it.reset();
return;
}
inner_it = bucket_it->begin();
}
}
std::optional<typename std::vector<T>::iterator> inner_it;
typename Buckets::iterator bucket_it;
typename Buckets::iterator bucket_end;
};
iterator begin() { return iterator(buckets.begin(), buckets.end()); }
iterator end() { return iterator(buckets.end(), buckets.end()); }
private:
void init(int num_threads) {
buckets.resize(num_threads);
}
Buckets buckets;
};
template <typename V>
struct DefaultCollisionHandler {
void operator()(typename V::Accumulated &, typename V::Accumulated &) const {}
};
// A hashtable that can be efficiently built in parallel and then looked up concurrently.
// `V` is the type of elements that will be added to the hashtable. It must have a
// member type `Accumulated` representing the combination of multiple `V` elements. This
// can be the same as `V`, but for example `V` could contain a Wire* and `V::Accumulated`
// could contain a `pool<Wire*>`. `KeyEquality` is a class containing an `operator()` that
// returns true of two `V` elements have equal keys.
// `CollisionHandler` is used to reduce two `V::Accumulated` values into a single value.
//
// To use this, first construct a `Builder` and fill it in (in parallel), then construct
// a `ShardedHashSet` from the `Builder`.
template <typename V, typename KeyEquality, typename CollisionHandler = DefaultCollisionHandler<V>>
class ShardedHashSet {
public:
// A combination of a `V` and its hash value.
struct Value {
Value(V value, unsigned int hash) : value(std::move(value)), hash(hash) {}
Value(Value &&) = default;
Value(const Value &) = delete;
Value &operator=(const Value &) = delete;
V value;
unsigned int hash;
};
// A combination of a `V::Accumulated` and its hash value.
struct AccumulatedValue {
AccumulatedValue(typename V::Accumulated value, unsigned int hash) : value(std::move(value)), hash(hash) {}
AccumulatedValue(AccumulatedValue &&) = default;
#if defined(_MSC_VER)
AccumulatedValue(const AccumulatedValue &) {
log_error("Copy constructor called on AccumulatedValue");
}
AccumulatedValue &operator=(const AccumulatedValue &) {
log_error("Copy assignment called on AccumulatedValue");
return *this;
}
#else
AccumulatedValue(const AccumulatedValue &) = delete;
AccumulatedValue &operator=(const AccumulatedValue &) = delete;
#endif
typename V::Accumulated value;
unsigned int hash;
};
// A class containing an `operator()` that returns true of two `AccumulatedValue`
// elements have equal keys.
// Required to insert `AccumulatedValue`s into an `std::unordered_set`.
struct AccumulatedValueEquality {
KeyEquality inner;
AccumulatedValueEquality(const KeyEquality &inner) : inner(inner) {}
bool operator()(const AccumulatedValue &v1, const AccumulatedValue &v2) const {
return inner(v1.value, v2.value);
}
};
// A class containing an `operator()` that returns the hash value of an `AccumulatedValue`.
// Required to insert `AccumulatedValue`s into an `std::unordered_set`.
struct AccumulatedValueHashOp {
size_t operator()(const AccumulatedValue &v) const {
return static_cast<size_t>(v.hash);
}
};
using Shard = std::unordered_set<AccumulatedValue, AccumulatedValueHashOp, AccumulatedValueEquality>;
// First construct one of these. Then populate it in parallel by calling `insert()` from many threads.
// Then do another parallel phase calling `process()` from many threads.
class Builder {
public:
Builder(const ParallelDispatchThreadPool &thread_pool, KeyEquality equality = KeyEquality(), CollisionHandler collision_handler = CollisionHandler())
: collision_handler(std::move(collision_handler)) {
init(thread_pool.num_threads(), std::move(equality));
}
Builder(const ParallelDispatchThreadPool::Subpool &thread_pool, KeyEquality equality = KeyEquality(), CollisionHandler collision_handler = CollisionHandler())
: collision_handler(std::move(collision_handler)) {
init(thread_pool.num_threads(), std::move(equality));
}
// First call `insert` to insert all elements. All inserts must finish
// before calling any `process()`.
void insert(const ThreadIndex &thread, Value v) {
// You might think that for the single-threaded case, we can optimize by
// inserting directly into the `std::unordered_set` here. But that slows things down
// a lot and I never got around to figuring out why.
std::vector<std::vector<Value>> &buckets = all_buckets[thread.thread_num];
size_t bucket = static_cast<size_t>(v.hash) % buckets.size();
buckets[bucket].emplace_back(std::move(v));
}
// Then call `process` for each thread. All `process()`s must finish before using
// the `Builder` to construct a `ShardedHashSet`.
void process(const ThreadIndex &thread) {
int size = 0;
for (std::vector<std::vector<Value>> &buckets : all_buckets)
size += GetSize(buckets[thread.thread_num]);
Shard &shard = shards[thread.thread_num];
shard.reserve(size);
for (std::vector<std::vector<Value>> &buckets : all_buckets) {
for (Value &value : buckets[thread.thread_num])
accumulate(value, shard);
// Free as much memory as we can during the parallel phase.
std::vector<Value>().swap(buckets[thread.thread_num]);
}
}
private:
friend class ShardedHashSet<V, KeyEquality, CollisionHandler>;
void accumulate(Value &value, Shard &shard) {
// With C++20 we could make this more efficient using heterogenous lookup
AccumulatedValue accumulated_value{std::move(value.value), value.hash};
auto [it, inserted] = shard.insert(std::move(accumulated_value));
if (!inserted)
collision_handler(const_cast<typename V::Accumulated &>(it->value), accumulated_value.value);
}
void init(int num_threads, KeyEquality equality) {
all_buckets.resize(num_threads);
for (std::vector<std::vector<Value>> &buckets : all_buckets)
buckets.resize(num_threads);
for (int i = 0; i < num_threads; ++i)
shards.emplace_back(0, AccumulatedValueHashOp(), AccumulatedValueEquality(equality));
}
const CollisionHandler collision_handler;
std::vector<std::vector<std::vector<Value>>> all_buckets;
std::vector<Shard> shards;
};
// Then finally construct the hashtable:
ShardedHashSet(Builder &builder) : shards(std::move(builder.shards)) {
// Check that all necessary 'process()' calls were made.
for (std::vector<std::vector<Value>> &buckets : builder.all_buckets)
for (std::vector<Value> &bucket : buckets)
log_assert(bucket.empty());
// Free memory.
std::vector<std::vector<std::vector<Value>>>().swap(builder.all_buckets);
}
ShardedHashSet(ShardedHashSet &&other) = default;
ShardedHashSet() {}
ShardedHashSet &operator=(ShardedHashSet &&other) = default;
// Look up by `AccumulatedValue`. If we switch to C++20 then we could use
// heterogenous lookup to support looking up by `Value` here. Returns nullptr
// if the key is not found.
const typename V::Accumulated *find(const AccumulatedValue &v) const {
size_t num_shards = shards.size();
if (num_shards == 0)
return nullptr;
size_t shard = static_cast<size_t>(v.hash) % num_shards;
auto it = shards[shard].find(v);
if (it == shards[shard].end())
return nullptr;
return &it->value;
}
// Insert an element into the table. The caller is responsible for ensuring this does not
// happen concurrently with any other method calls.
void insert(AccumulatedValue v) {
size_t num_shards = shards.size();
if (num_shards == 0)
return;
size_t shard = static_cast<size_t>(v.hash) % num_shards;
shards[shard].insert(v);
}
// Call this for each shard to implement parallel destruction. For very large `ShardedHashSet`s,
// deleting all elements of all shards on a single thread can be a performance bottleneck.
void clear(const ThreadIndex &shard) {
AccumulatedValueEquality equality = shards[0].key_eq();
shards[shard.thread_num] = Shard(0, AccumulatedValueHashOp(), equality);
}
private:
std::vector<Shard> shards;
};
// A concurrent work-queue that can share batches of work across threads.
// Uses a naive implementation of work-stealing.
template <typename T>
class ConcurrentWorkQueue {
public:
// Create a queue that supports the given number of threads and
// groups work into `batch_size` units.
ConcurrentWorkQueue(int num_threads, int batch_size = 100)
: batch_size(batch_size), thread_states(num_threads) {}
int num_threads() const { return GetSize(thread_states); }
// Push some work to do. Pushes and pops with the same `thread` must
// not happen concurrently.
void push(const ThreadIndex &thread, T work) {
ThreadState &thread_state = thread_states[thread.thread_num];
thread_state.next_batch.emplace_back(std::move(work));
if (GetSize(thread_state.next_batch) < batch_size)
return;
bool was_empty;
{
std::unique_lock lock(thread_state.batches_lock);
was_empty = thread_state.batches.empty();
thread_state.batches.push_back(std::move(thread_state.next_batch));
}
if (was_empty) {
std::unique_lock lock(waiters_lock);
if (num_waiters > 0) {
waiters_cv.notify_one();
}
}
}
// Grab some work to do.
// If all threads enter `pop_batch()`, then instead of deadlocking the
// queue will return no work. That is the only case in which it will
// return no work.
std::vector<T> pop_batch(const ThreadIndex &thread) {
ThreadState &thread_state = thread_states[thread.thread_num];
if (!thread_state.next_batch.empty())
return std::move(thread_state.next_batch);
// Empty our own work queue first.
{
std::unique_lock lock(thread_state.batches_lock);
if (!thread_state.batches.empty()) {
std::vector<T> batch = std::move(thread_state.batches.back());
thread_state.batches.pop_back();
return batch;
}
}
// From here on in this function, our work queue is empty.
while (true) {
std::vector<T> batch = try_steal(thread);
if (!batch.empty()) {
return std::move(batch);
}
// Termination: if all threads run out of work, then all of
// them will eventually enter this loop and there will be no further
// notifications on waiters_cv, so all will eventually increment
// num_waiters and wait, so num_waiters == num_threads()
// will become true.
std::unique_lock lock(waiters_lock);
++num_waiters;
if (num_waiters == num_threads()) {
waiters_cv.notify_all();
return {};
}
// As above, it's possible that we'll wait here even when there
// are work batches posted by other threads. That's OK.
waiters_cv.wait(lock);
if (num_waiters == num_threads())
return {};
--num_waiters;
}
}
private:
std::vector<T> try_steal(const ThreadIndex &thread) {
for (int i = 1; i < num_threads(); i++) {
int other_thread_num = (thread.thread_num + i) % num_threads();
ThreadState &other_thread_state = thread_states[other_thread_num];
std::unique_lock lock(other_thread_state.batches_lock);
if (!other_thread_state.batches.empty()) {
std::vector<T> batch = std::move(other_thread_state.batches.front());
other_thread_state.batches.pop_front();
return batch;
}
}
return {};
}
int batch_size;
struct ThreadState {
// Entirely thread-local.
std::vector<T> next_batch;
std::mutex batches_lock;
// Only the associated thread ever adds to this, and only at the back.
// Other threads can remove elements from the front.
std::deque<std::vector<T>> batches;
};
std::vector<ThreadState> thread_states;
std::mutex waiters_lock;
std::condition_variable waiters_cv;
// Number of threads waiting for work. Their queues are empty.
int num_waiters = 0;
};
// A monotonic flag. Starts false, and can be set to true in a thread-safe way.
// Once `load()` returns true, it will always return true.
// Uses relaxed atomics so there are no memory ordering guarantees. Do not use this
// to guard access to shared memory.
class MonotonicFlag {
public:
MonotonicFlag() : value(false) {}
bool load() const { return value.load(std::memory_order_relaxed); }
void set() { value.store(true, std::memory_order_relaxed); }
bool set_and_return_old() {
return value.exchange(true, std::memory_order_relaxed);
}
private:
std::atomic<bool> value;
};
YOSYS_NAMESPACE_END
#endif // YOSYS_THREADING_H

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,9 @@
read_rtlil << EOT
module \top
attribute \init 1'0
wire \w
connect \w 1'0
end
EOT
opt_clean

View file

@ -8,7 +8,7 @@
"end\n"
]
],
"<WIRE>": [ [ " wire width ", "<WIDTH>", " ", "<WIRE_MODE>", " ", "<WIRE_ID>", "\n" ] ],
"<WIRE>": [ [ "<WIRE_ATTRIBUTES>", " wire width ", "<WIDTH>", " ", "<WIRE_MODE>", " ", "<WIRE_ID>", "\n" ] ],
"<WIDTH>": [ [ "1" ], [ "2" ], [ "3" ], [ "4" ], [ "32" ], [ "128" ] ],
"<WIRE_MODE>": [ [ "input ", "<PORT_ID>" ], [ "output ", "<PORT_ID>" ], [ "inout ", "<PORT_ID>" ], [] ],
"<CELL>": [
@ -71,6 +71,7 @@
" end\n"
]
],
"<WIRE_ATTRIBUTE>": [ [ " attribute \\init ", "<CONST>", "\n" ] ],
"<WIRE_ID>": [ [ "\\wire_a" ], [ "\\wire_b" ], [ "\\wire_c" ], [ "\\wire_d" ], [ "\\wire_e" ], [ "\\wire_f" ], [ "\\wire_g" ], [ "\\wire_h" ], [ "\\wire_i" ], [ "\\wire_j" ] ],
"<CELL_ID>": [ [ "\\cell_a" ], [ "\\cell_b" ], [ "\\cell_c" ], [ "\\cell_d" ], [ "\\cell_e" ], [ "\\cell_f" ], [ "\\cell_g" ], [ "\\cell_h" ], [ "\\cell_i" ], [ "\\cell_j" ] ],
"<BLACKBOX_CELL>": [ [ "\\bb1" ], [ "\\bb2" ] ],
@ -97,6 +98,7 @@
"<CONNECT>": [ [ " connect ", "<SIGSPEC>", " ", "<SIGSPEC>", "\n" ] ],
"<WIRES>": [ [ ], [ "<WIRE>", "<WIRES>" ] ],
"<WIRE_ATTRIBUTES>": [ [ ], [ "<WIRE_ATTRIBUTE>", "<WIRE_ATTRIBUTES>" ] ],
"<CELLS>": [ [ ], [ "<CELL>", "<CELLS>" ] ],
"<BITS>": [ [ ], [ "<BIT>", "<BITS>" ] ],
"<CONNECTS>": [ [ ], [ "<CONNECT>", "<CONNECTS>" ] ],

View file

@ -4,10 +4,10 @@ UNAME_S := $(shell uname -s)
GTEST_PREFIX := $(shell brew --prefix googletest 2>/dev/null)
ifeq ($(GTEST_PREFIX),)
GTEST_CXXFLAGS :=
GTEST_LDFLAGS := -lgtest -lgtest_main
GTEST_LDFLAGS := -lgtest -lgmock -lgtest_main
else
GTEST_CXXFLAGS := -I$(GTEST_PREFIX)/include
GTEST_LDFLAGS := -L$(GTEST_PREFIX)/lib -lgtest -lgtest_main
GTEST_LDFLAGS := -L$(GTEST_PREFIX)/lib -lgtest -lgmock -lgtest_main
endif
ifeq ($(UNAME_S),Darwin)

View file

@ -0,0 +1,442 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "kernel/threading.h"
YOSYS_NAMESPACE_BEGIN
class ThreadingTest : public testing::Test {
protected:
ThreadingTest() {
if (log_files.empty())
log_files.emplace_back(stdout);
}
};
TEST_F(ThreadingTest, ParallelDispatchThreadPoolCreate) {
// Test creating a pool with 0 threads (treated as 1)
ParallelDispatchThreadPool pool0(0);
EXPECT_EQ(pool0.num_threads(), 1);
// Test creating a pool with 1 thread
ParallelDispatchThreadPool pool1(1);
EXPECT_EQ(pool1.num_threads(), 1);
// Test creating a pool with 2 threads
ParallelDispatchThreadPool pool2(2);
// YOSYS_MAX_THREADS or system configuration could mean we
// decide to only use one thread.
EXPECT_GE(pool2.num_threads(), 1);
EXPECT_LE(pool2.num_threads(), 2);
}
TEST_F(ThreadingTest, ParallelDispatchThreadPoolRunSimple) {
ParallelDispatchThreadPool pool(2);
std::atomic<int> counter{0};
pool.run([&counter](const ParallelDispatchThreadPool::RunCtx &) {
counter.fetch_add(1, std::memory_order_relaxed);
});
EXPECT_EQ(counter.load(), pool.num_threads());
}
TEST_F(ThreadingTest, ParallelDispatchThreadPoolRunMultiple) {
ParallelDispatchThreadPool pool(2);
std::atomic<int> counter{0};
// Run multiple times to verify the pool can be reused
for (int i = 0; i < 5; ++i)
pool.run([&counter](const ParallelDispatchThreadPool::RunCtx &) {
counter.fetch_add(1, std::memory_order_relaxed);
});
EXPECT_EQ(counter.load(), pool.num_threads() * 5);
}
TEST_F(ThreadingTest, ParallelDispatchThreadPoolRunCtxThreadNums) {
ParallelDispatchThreadPool pool(4);
std::vector<int> thread_nums(pool.num_threads(), -1);
pool.run([&thread_nums](const ParallelDispatchThreadPool::RunCtx &ctx) {
thread_nums[ctx.thread_num] = ctx.thread_num;
});
// Every thread should have recorded its own thread number
for (int i = 0; i < pool.num_threads(); ++i)
EXPECT_EQ(thread_nums[i], i);
}
TEST_F(ThreadingTest, ParallelDispatchThreadPoolItemRange) {
ParallelDispatchThreadPool pool(3);
const int num_items = 100;
std::vector<std::atomic<int>> item_counts(num_items);
for (std::atomic<int> &c : item_counts)
c.store(0);
pool.run([&item_counts](const ParallelDispatchThreadPool::RunCtx &ctx) {
for (int i : ctx.item_range(num_items))
item_counts[i].fetch_add(1);
});
// Each item should have been processed exactly once
for (int i = 0; i < num_items; ++i)
EXPECT_EQ(item_counts[i].load(), 1);
}
TEST_F(ThreadingTest, ParallelDispatchThreadPoolSubpool) {
ParallelDispatchThreadPool pool(4);
// Subpool limited to 2 threads
ParallelDispatchThreadPool::Subpool subpool(pool, 2);
EXPECT_LE(subpool.num_threads(), 2);
std::atomic<int> counter{0};
subpool.run([&counter](const ParallelDispatchThreadPool::RunCtx &) {
counter.fetch_add(1, std::memory_order_relaxed);
});
EXPECT_EQ(counter.load(), subpool.num_threads());
}
TEST_F(ThreadingTest, IntRangeIteration) {
IntRange range{3, 7};
std::vector<int> values;
for (int i : range)
values.push_back(i);
EXPECT_THAT(values, testing::ElementsAre(3, 4, 5, 6));
}
TEST_F(ThreadingTest, IntRangeEmpty) {
IntRange range{5, 5};
for (int _ : range)
FAIL();
}
TEST_F(ThreadingTest, ItemRangeForWorker) {
EXPECT_EQ(item_range_for_worker(10, 0, 3), (IntRange{0, 4}));
EXPECT_EQ(item_range_for_worker(10, 1, 3), (IntRange{4, 7}));
EXPECT_EQ(item_range_for_worker(10, 2, 3), (IntRange{7, 10}));
}
TEST_F(ThreadingTest, ItemRangeForWorkerZeroThreads) {
EXPECT_EQ(item_range_for_worker(10, 0, 0), (IntRange{0, 10}));
}
TEST_F(ThreadingTest, ShardedVectorBasic) {
ParallelDispatchThreadPool pool(2);
ShardedVector<int> vec(pool);
pool.run([&vec](const ParallelDispatchThreadPool::RunCtx &ctx) {
vec.insert(ctx, ctx.thread_num * 10);
vec.insert(ctx, ctx.thread_num * 10 + 1);
});
EXPECT_FALSE(vec.empty());
// Count elements
std::vector<int> elements;
for (int v : vec) {
elements.push_back(v);
}
if (pool.num_threads() == 2)
EXPECT_THAT(elements, testing::ElementsAre(0, 1, 10, 11));
else
EXPECT_THAT(elements, testing::ElementsAre(0, 1));
}
TEST_F(ThreadingTest, MonotonicFlagBasic) {
MonotonicFlag flag;
EXPECT_FALSE(flag.load());
flag.set();
EXPECT_TRUE(flag.load());
flag.set();
EXPECT_TRUE(flag.load());
}
TEST_F(ThreadingTest, MonotonicFlagSetAndReturnOld) {
MonotonicFlag flag;
EXPECT_FALSE(flag.set_and_return_old());
EXPECT_TRUE(flag.load());
EXPECT_TRUE(flag.set_and_return_old());
}
TEST_F(ThreadingTest, ConcurrentQueueBasic) {
ConcurrentQueue<int> queue;
queue.push_back(1);
queue.push_back(2);
queue.push_back(3);
auto v1 = queue.pop_front();
auto v2 = queue.pop_front();
auto v3 = queue.pop_front();
ASSERT_TRUE(v1.has_value());
ASSERT_TRUE(v2.has_value());
ASSERT_TRUE(v3.has_value());
EXPECT_EQ(*v1, 1);
EXPECT_EQ(*v2, 2);
EXPECT_EQ(*v3, 3);
}
TEST_F(ThreadingTest, ConcurrentQueueTryPopEmpty) {
ConcurrentQueue<int> queue;
auto v = queue.try_pop_front();
EXPECT_FALSE(v.has_value());
}
TEST_F(ThreadingTest, ConcurrentQueueClose) {
ConcurrentQueue<int> queue;
queue.push_back(42);
queue.close();
// Can still pop existing elements
auto v1 = queue.pop_front();
ASSERT_TRUE(v1.has_value());
EXPECT_EQ(*v1, 42);
// After close and empty, pop_front returns nullopt
auto v2 = queue.pop_front();
EXPECT_FALSE(v2.has_value());
}
TEST_F(ThreadingTest, ThreadPoolCreate) {
// pool_size of 0 means no worker threads
ThreadPool pool0(0, [](int) {});
EXPECT_EQ(pool0.num_threads(), 0);
// pool_size of 1 means 1 worker thread
std::atomic<int> counter{0};
{
ThreadPool pool1(1, [&counter](int thread_num) {
EXPECT_EQ(thread_num, 0);
counter.fetch_add(1);
});
}
#ifdef YOSYS_ENABLE_THREADS
EXPECT_EQ(counter.load(), 1);
#else
EXPECT_EQ(counter.load(), 0);
#endif
}
TEST_F(ThreadingTest, ThreadPoolMultipleThreads) {
std::atomic<int> counter{0};
{
ThreadPool pool(2, [&counter](int) {
counter.fetch_add(1);
});
EXPECT_LE(pool.num_threads(), 2);
}
#ifdef YOSYS_ENABLE_THREADS
EXPECT_GE(counter.load(), 1);
EXPECT_LE(counter.load(), 2);
#else
EXPECT_EQ(counter.load(), 0);
#endif
}
// Helper types for ShardedHashSet tests
struct IntValue {
using Accumulated = IntValue;
int value;
operator int() const { return value; }
};
struct IntValueEquality {
bool operator()(int a, int b) const { return a == b; }
};
TEST_F(ThreadingTest, ShardedHashSetBasic) {
ParallelDispatchThreadPool pool(1);
using HashSet = ShardedHashSet<IntValue, IntValueEquality>;
HashSet::Builder builder(pool);
// Insert some values
pool.run([&builder](const ParallelDispatchThreadPool::RunCtx &ctx) {
builder.insert(ctx, {{10}, 10});
builder.insert(ctx, {{20}, 20});
builder.insert(ctx, {{30}, 30});
});
// Process
pool.run([&builder](const ParallelDispatchThreadPool::RunCtx &ctx) {
builder.process(ctx);
});
// Build and lookup
HashSet set(builder);
const IntValue *found10 = set.find({{10}, 10});
const IntValue *found20 = set.find({{20}, 20});
const IntValue *found99 = set.find({{99}, 99});
ASSERT_NE(found10, nullptr);
ASSERT_NE(found20, nullptr);
EXPECT_EQ(found99, nullptr);
EXPECT_EQ(*found10, 10);
EXPECT_EQ(*found20, 20);
}
TEST_F(ThreadingTest, ShardedHashSetParallelInsert) {
ParallelDispatchThreadPool pool(3);
using HashSet = ShardedHashSet<IntValue, IntValueEquality>;
HashSet::Builder builder(pool);
// Insert values from multiple threads
pool.run([&builder](const ParallelDispatchThreadPool::RunCtx &ctx) {
for (int i = 0; i < 10; ++i) {
int val = ctx.thread_num * 100 + i;
builder.insert(ctx, {{val}, static_cast<unsigned>(val)});
}
});
pool.run([&builder](const ParallelDispatchThreadPool::RunCtx &ctx) {
builder.process(ctx);
});
HashSet set(builder);
// Verify all values can be found
for (int t = 0; t < pool.num_threads(); ++t) {
for (int i = 0; i < 10; ++i) {
int val = t * 100 + i;
const IntValue *found = set.find({{val}, static_cast<unsigned>(val)});
ASSERT_NE(found, nullptr) << "Value " << val << " not found";
EXPECT_EQ(*found, val);
}
}
}
// Helper types for ShardedHashSet tests
struct IntDictValue {
using Accumulated = IntDictValue;
int key;
int value;
bool operator==(const IntDictValue &other) const { return key == other.key && value == other.value; }
bool operator!=(const IntDictValue &other) const { return !(*this == other); }
};
struct IntDictKeyEquality {
bool operator()(const IntDictValue &a, const IntDictValue &b) const { return a.key == b.key; }
};
// Collision handler that sums values
struct SumCollisionHandler {
void operator()(IntDictValue &existing, IntDictValue &incoming) const {
existing.value += incoming.value;
}
};
TEST_F(ThreadingTest, ShardedHashSetCollision) {
ParallelDispatchThreadPool pool(1);
using HashSet = ShardedHashSet<IntDictValue, IntDictKeyEquality, SumCollisionHandler>;
HashSet::Builder builder(pool);
// Insert duplicate keys with same hash - duplicates should collapse
pool.run([&builder](const ParallelDispatchThreadPool::RunCtx &ctx) {
builder.insert(ctx, {{5, 10}, 5});
builder.insert(ctx, {{5, 12}, 5}); // Duplicate key/hash
builder.insert(ctx, {{5, 14}, 5}); // Another duplicate
});
pool.run([&builder](const ParallelDispatchThreadPool::RunCtx &ctx) {
builder.process(ctx);
});
HashSet set(builder);
const IntDictValue *found = set.find({{5, 0}, 5});
ASSERT_NE(found, nullptr);
// With default collision handler, first value is kept
EXPECT_EQ(*found, (IntDictValue{5, 36}));
}
TEST_F(ThreadingTest, ShardedHashSetEmpty) {
ParallelDispatchThreadPool pool(1);
using HashSet = ShardedHashSet<IntValue, IntValueEquality>;
HashSet::Builder builder(pool);
// Don't insert anything, just process
pool.run([&builder](const ParallelDispatchThreadPool::RunCtx &ctx) {
builder.process(ctx);
});
HashSet set(builder);
const IntValue *found = set.find({{42}, 42});
EXPECT_EQ(found, nullptr);
}
TEST_F(ThreadingTest, ConcurrentWorkQueueSingleThread) {
ConcurrentWorkQueue<int> queue(1, 10); // 1 thread, batch size 10
EXPECT_EQ(queue.num_threads(), 1);
ThreadIndex thread{0};
// Push some items (less than batch size)
for (int i = 0; i < 5; ++i)
queue.push(thread, i);
// Pop should return those items
std::vector<int> batch = queue.pop_batch(thread);
EXPECT_THAT(batch, testing::UnorderedElementsAre(0, 1, 2, 3, 4));
// Next pop should return empty (all threads "waiting")
std::vector<int> empty_batch = queue.pop_batch(thread);
EXPECT_TRUE(empty_batch.empty());
}
TEST_F(ThreadingTest, ConcurrentWorkQueueBatching) {
ConcurrentWorkQueue<int> queue(1, 3); // batch size 3
ThreadIndex thread{0};
queue.push(thread, 10);
queue.push(thread, 20);
queue.push(thread, 30);
queue.push(thread, 40);
queue.push(thread, 50);
std::vector<int> popped;
while (true) {
std::vector<int> batch = queue.pop_batch(thread);
if (batch.empty())
break;
popped.insert(popped.end(), batch.begin(), batch.end());
}
EXPECT_THAT(popped, testing::UnorderedElementsAre(10, 20, 30, 40, 50));
}
TEST_F(ThreadingTest, ConcurrentWorkQueueParallel) {
ParallelDispatchThreadPool pool(2);
if (pool.num_threads() < 2) {
// Skip test if we don't have multiple threads
return;
}
ConcurrentWorkQueue<int> queue(2, 3);
std::atomic<int> sum{0};
pool.run([&queue, &sum](const ParallelDispatchThreadPool::RunCtx &ctx) {
// Each thread pushes some work
for (int i = 0; i < 10; ++i)
queue.push(ctx, ctx.thread_num * 100 + i);
// Each thread processes work until done
while (true) {
std::vector<int> batch = queue.pop_batch(ctx);
if (batch.empty())
break;
for (int v : batch)
sum.fetch_add(v);
}
});
// Thread 0 pushes: 0+1+2+...+9 = 45
// Thread 1 pushes: 100+101+...+109 = 1045
// Total = 45 + 1045 = 1090
EXPECT_EQ(sum.load(), 1090);
}
YOSYS_NAMESPACE_END