diff --git a/src/smt/smt_parallel.cpp b/src/smt/smt_parallel.cpp index 43591ee5c..44d32c6e8 100644 --- a/src/smt/smt_parallel.cpp +++ b/src/smt/smt_parallel.cpp @@ -40,7 +40,6 @@ namespace smt { namespace smt { - void parallel::worker::run() { ast_translation tr(ctx->m, m); while (m.inc()) { @@ -56,10 +55,13 @@ namespace smt { // return unprocessed cubes to the batch manager // add a split literal to the batch manager. // optionally process other cubes and delay sending back unprocessed cubes to batch manager. + b.m_cubes.push_back(cube); // TODO: add access funcs for m_cubes break; case l_true: { model_ref mdl; ctx->get_model(mdl); + if (mdl) + ctx->set_model(mdl->translate(tr)); //b.set_sat(tr, *mdl); return; } @@ -68,6 +70,9 @@ namespace smt { // otherwise, extract lemmas that can be shared (units (and unsat core?)). // share with batch manager. // process next cube. + ctx->m_unsat_core.reset(); + for (expr* e : pctx.unsat_core()) // TODO: move this logic to the batch manager since this is per-thread + ctx->m_unsat_core.push_back(tr(e)); break; } } @@ -75,7 +80,6 @@ namespace smt { } parallel::worker::worker(parallel& p, context& _ctx, expr_ref_vector const& _asms): p(p), b(p.m_batch_manager), m_smt_params(_ctx.get_fparams()), asms(m) { - ast_translation g2l(_ctx.m, m); for (auto e : _asms) asms.push_back(g2l(e)); @@ -85,8 +89,12 @@ namespace smt { lbool parallel::worker::check_cube(expr_ref_vector const& cube) { - - return l_undef; + for (auto& atom : cube) { + asms.push_back(atom); + } + lbool r = ctx->check(asms.size(), asms.data()); + asms.shrink(asms.size() - cube.size()); + return r; } void parallel::batch_manager::get_cubes(ast_translation& g2l, vector& cubes) { @@ -96,9 +104,8 @@ namespace smt { cubes.push_back(expr_ref_vector(g2l.to())); return; } - // TODO adjust to number of worker threads runnin. - // if the size of m_cubes is less than m_max_batch_size/ num_threads, then return fewer cubes. - for (unsigned i = 0; i < m_max_batch_size && !m_cubes.empty(); ++i) { + + for (unsigned i = 0; i < std::min(m_max_batch_size / p.num_threads, (unsigned)m_cubes.size()) && !m_cubes.empty(); ++i) { auto& cube = m_cubes.back(); expr_ref_vector l_cube(g2l.to()); for (auto& e : cube) { @@ -109,6 +116,21 @@ namespace smt { } } + void parallel::batch_manager::set_sat(ast_translation& l2g, model& m) { + std::scoped_lock lock(mux); + if (m_result == l_true || m_result == l_undef) { + m_result = l_true; + return; + } + m_result = l_true; + for (auto& c : m_cubes) { + expr_ref_vector g_cube(l2g.to()); + for (auto& e : c) { + g_cube.push_back(l2g(e)); + } + share_lemma(l2g, mk_and(g_cube)); + } + } void parallel::batch_manager::return_cubes(ast_translation& l2g, vectorconst& cubes, expr_ref_vector const& split_atoms) { std::scoped_lock lock(mux); @@ -120,6 +142,7 @@ namespace smt { // TODO: split this g_cube on m_split_atoms that are not already in g_cube as literals. m_cubes.push_back(g_cube); } + // TODO: avoid making m_cubes too large. for (auto& atom : split_atoms) { expr_ref g_atom(l2g.from()); @@ -136,9 +159,27 @@ namespace smt { } } + expr_ref_vector parallel::worker::get_split_atoms() { + unsigned k = 1; + + auto candidates = ctx->m_pq_scores.get_heap(); + std::sort(candidates.begin(), candidates.end(), + [](const auto& a, const auto& b) { return a.priority > b.priority; }); + + expr_ref_vector top_lits(m); + for (const auto& node : candidates) { + if (ctx->get_assignment(node.key) != l_undef) continue; + + expr* e = ctx->bool_var2expr(node.key); + if (!e) continue; + + top_lits.push_back(expr_ref(e, m)); + if (top_lits.size() >= k) break; + } + return top_lits; + } lbool parallel::new_check(expr_ref_vector const& asms) { - ast_manager& m = ctx.m; { scoped_limits sl(m.limit()); @@ -146,6 +187,11 @@ namespace smt { SASSERT(num_threads > 1); for (unsigned i = 0; i < num_threads; ++i) m_workers.push_back(alloc(worker, *this, ctx, asms)); + + // THIS WILL ALLOW YOU TO CANCEL ALL THE CHILD THREADS + // within the lexical scope of the code block, creates a data structure that allows you to push children + // objects to the limit object, so if someone cancels the parent object, the cancellation propagates to the children + // and that cancellation has the lifetime of the scope for (auto w : m_workers) sl.push_child(&(w->limit())); @@ -154,8 +200,7 @@ namespace smt { for (unsigned i = 0; i < num_threads; ++i) { threads[i] = std::thread([&, i]() { m_workers[i]->run(); - } - ); + }); } // Wait for all threads to finish @@ -177,12 +222,6 @@ namespace smt { // try first sequential with a low conflict budget to make super easy problems cheap // GET RID OF THIS, AND IMMEDIATELY SEND TO THE MULTITHREADED CHECKER // THE FIRST BATCH OF CUBES IS EMPTY, AND WE WILL SET ALL THREADS TO WORK ON THE ORIGINAL FORMULA - unsigned max_c = std::min(thread_max_conflicts, 40u); - flet _mc(ctx.get_fparams().m_max_conflicts, max_c); - result = ctx.check(asms.size(), asms.data()); - if (result != l_undef || ctx.m_num_conflicts < max_c) { - return result; - } enum par_exception_kind { DEFAULT_EX, @@ -226,77 +265,6 @@ namespace smt { sl.push_child(&(new_m->limit())); } - - auto cube_pq = [&](context& ctx, expr_ref_vector& lasms, expr_ref& c) { - unsigned k = 3; // Number of top literals you want - - ast_manager& m = ctx.get_manager(); - - // Get the entire fixed-size priority queue (it's not that big) - auto candidates = ctx.m_pq_scores.get_heap(); // returns vector> - - // Sort descending by priority (higher priority first) - std::sort(candidates.begin(), candidates.end(), - [](const auto& a, const auto& b) { return a.priority > b.priority; }); - - expr_ref_vector conjuncts(m); - unsigned count = 0; - - for (const auto& node : candidates) { - if (ctx.get_assignment(node.key) != l_undef) continue; - - expr* e = ctx.bool_var2expr(node.key); - if (!e) continue; - - - expr_ref lit(e, m); - conjuncts.push_back(lit); - - if (++count >= k) break; - } - - c = mk_and(conjuncts); - lasms.push_back(c); - }; - - auto cube_score = [&](context& ctx, expr_ref_vector& lasms, expr_ref& c) { - vector> candidates; - unsigned k = 4; // Get top-k scoring literals - ast_manager& m = ctx.get_manager(); - - // Loop over first 100 Boolean vars - for (bool_var v = 0; v < 100; ++v) { - if (ctx.get_assignment(v) != l_undef) continue; - - expr* e = ctx.bool_var2expr(v); - if (!e) continue; - - literal lit(v, false); - double score = ctx.get_score(lit); - if (score == 0.0) continue; - - candidates.push_back(std::make_pair(expr_ref(e, m), score)); - } - - // Sort all candidate literals descending by score - std::sort(candidates.begin(), candidates.end(), - [](auto& a, auto& b) { return a.second > b.second; }); - - // Clear c and build it as conjunction of top-k - expr_ref_vector conjuncts(m); - - for (unsigned i = 0; i < std::min(k, (unsigned)candidates.size()); ++i) { - expr_ref lit = candidates[i].first; - conjuncts.push_back(lit); - } - - // Build conjunction and store in c - c = mk_and(conjuncts); - - // Add the single cube formula to lasms (not each literal separately) - lasms.push_back(c); - }; - obj_hashtable unit_set; expr_ref_vector unit_trail(ctx.m); unsigned_vector unit_lim; @@ -338,281 +306,6 @@ namespace smt { IF_VERBOSE(1, verbose_stream() << "(smt.thread :units " << sz << ")\n"); }; - std::mutex mux; - - // Lambda defining the work each SMT thread performs - auto worker_thread = [&](int i, vector& cube_batch) { - try { - // Get thread-specific context and AST manager - context& pctx = *pctxs[i]; - ast_manager& pm = *pms[i]; - - // Initialize local assumptions and cube - expr_ref_vector lasms(pasms[i]); - - vector results; - for (expr_ref_vector& cube : cube_batch) { - expr_ref_vector lasms_copy(lasms); // DON'T NEED TO COPY, JUST SHRINK BACK TO ORIGINAL SIZE - - if (&cube.get_manager() != &pm) { - std::cerr << "Manager mismatch on cube: " << mk_bounded_pp(mk_and(cube), pm, 3) << "\n"; - UNREACHABLE(); // or throw - } - - for (expr* cube_lit : cube) { - lasms_copy.push_back(expr_ref(cube_lit, pm)); - } - - // Set the max conflict limit for this thread - pctx.get_fparams().m_max_conflicts = std::min(thread_max_conflicts, max_conflicts); - - // Optional verbose logging - IF_VERBOSE(1, verbose_stream() << "(smt.thread " << i; - if (num_rounds > 0) verbose_stream() << " :round " << num_rounds; - verbose_stream() << " :cube " << mk_bounded_pp(mk_and(cube), pm, 3); - verbose_stream() << ")\n";); - - lbool r = pctx.check(lasms_copy.size(), lasms_copy.data()); - std::cout << "Thread " << i << " finished cube " << mk_bounded_pp(mk_and(cube), pm, 3) << " with result: " << r << "\n"; - results.push_back(r); - } - - lbool r = l_false; - for (lbool res : results) { - if (res == l_true) { - r = l_true; - } else if (res == l_undef) { - if (r == l_false) - r = l_undef; - } - } - - auto cube_intersects_core = [&](expr* cube, const expr_ref_vector &core) { - expr_ref_vector cube_lits(pctx.m); - flatten_and(cube, cube_lits); - for (expr* lit : cube_lits) - if (core.contains(lit)) - return true; - return false; - }; - - // Handle results based on outcome and conflict count - if (r == l_undef && pctx.m_num_conflicts >= max_conflicts) - ; // no-op, allow loop to continue - else if (r == l_undef && pctx.m_num_conflicts >= thread_max_conflicts) - return; // quit thread early - // If cube was unsat and it's in the core, learn from it. i.e. a thread can be UNSAT because the cube c contradicted F. In this case learn the negation of the cube ¬c - // else if (r == l_false) { - // // IF_VERBOSE(1, verbose_stream() << "(smt.thread " << i << " :learn cube batch " << mk_bounded_pp(cube, pm, 3) << ")" << " unsat_core: " << pctx.unsat_core() << ")"); - // for (expr* cube : cube_batch) { // iterate over each cube in the batch - // if (cube_intersects_core(cube, pctx.unsat_core())) { - // // IF_VERBOSE(1, verbose_stream() << "(pruning cube: " << mk_bounded_pp(cube, pm, 3) << " given unsat core: " << pctx.unsat_core() << ")"); - // pctx.assert_expr(mk_not(mk_and(pctx.unsat_core()))); - // } - // } - // } - - // Begin thread-safe update of shared result state - // THIS SHOULD ALL BE HANDLED WITHIN THE BATCH MANAGER - // USING METHODS LIKE SET_UNSAT AND SET_SAT WHICH KILLS THE OTHER WORKER THREADS - bool first = false; - { - std::lock_guard lock(mux); - if (finished_id == UINT_MAX) { - finished_id = i; - first = true; - result = r; - done = true; - } - if (!first && r != l_undef && result == l_undef) { - finished_id = i; - result = r; - } - else if (!first) return; // nothing new to contribute - } - - // Cancel limits on other threads now that a result is known - // MOVE INSIDE BATCH MANAGER - for (ast_manager* m : pms) { - if (m != &pm) m->limit().cancel(); - } - } catch (z3_error & err) { - if (finished_id == UINT_MAX) { - error_code = err.error_code(); - ex_kind = ERROR_EX; - done = true; - } - } catch (z3_exception & ex) { - if (finished_id == UINT_MAX) { - ex_msg = ex.what(); - ex_kind = DEFAULT_EX; - done = true; - } - } catch (...) { - if (finished_id == UINT_MAX) { - ex_msg = "unknown exception"; - ex_kind = ERROR_EX; - done = true; - } - } - }; - - struct BatchManager { - std::mutex mtx; - vector> batches; - unsigned batch_idx = 0; - unsigned batch_size = 1; - - BatchManager(unsigned batch_size) : batch_size(batch_size) {} - - // translate the next SINGLE batch of batch_size cubes to the thread - vector get_next_batch( - ast_manager &main_ctx_m, - ast_manager &thread_m - ) { - std::lock_guard lock(mtx); - vector cube_batch; // ensure bound to thread manager - if (batch_idx >= batches.size()) return cube_batch; - - vector next_batch = batches[batch_idx]; - - for (const expr_ref_vector& cube : next_batch) { - expr_ref_vector translated_cube_lits(thread_m); - for (expr* lit : cube) { - // Translate each literal to the thread's manager - translated_cube_lits.push_back(translate(lit, main_ctx_m, thread_m)); // IF WE DO AST_TRANSLATION& g2l INSTEAD, THE AST MANAGER HANDLES THE TRANSLATION UNDER LOCK, THIS IS BETTER - } - cube_batch.push_back(translated_cube_lits); - } - - ++batch_idx; - - return cube_batch; - } - - // returns a list (vector) of cubes, where each cube is an expr_ref_vector of literals - // NOTE: THE HEAP IS THREAD SPECIFIC!!! SO DON'T QUERY FROM MAIN THREAD ALL THE TIME!! - // PASS IN THE CONTEXT OF THE THREAD WE WANT TO QUERY THE TOP K HEAP FROM!! - // ALSO, WE ARE GOING TO RETURN JUST THE TOP K LITS, NOT THE 2^K TOP CUBES - vector cube_batch_pq(context& ctx) { - unsigned k = 1; // generates 2^k cubes in the batch - ast_manager& m = ctx.get_manager(); - - auto candidates = ctx.m_pq_scores.get_heap(); - std::sort(candidates.begin(), candidates.end(), - [](const auto& a, const auto& b) { return a.priority > b.priority; }); - - expr_ref_vector top_lits(m); - for (const auto& node : candidates) { - if (ctx.get_assignment(node.key) != l_undef) continue; - - expr* e = ctx.bool_var2expr(node.key); - if (!e) continue; - - top_lits.push_back(expr_ref(e, m)); - if (top_lits.size() >= k) break; - } - - // std::cout << "Top lits:\n"; - // for (unsigned j = 0; j < top_lits.size(); ++j) { - // std::cout << " [" << j << "] " << mk_pp(top_lits[j].get(), m) << "\n"; - // } - - unsigned num_lits = top_lits.size(); - unsigned num_cubes = 1 << num_lits; // 2^num_lits combinations - - vector cube_batch; - - for (unsigned mask = 0; mask < num_cubes; ++mask) { - expr_ref_vector cube_lits(m); - for (unsigned i = 0; i < num_lits; ++i) { - expr_ref lit(top_lits[i].get(), m); - if ((mask >> i) & 1) - cube_lits.push_back(mk_not(lit)); - else - cube_lits.push_back(lit); - } - cube_batch.push_back(cube_lits); - } - - std::cout << "Cubes out:\n"; - for (unsigned j = 0; j < cube_batch.size(); ++j) { - std::cout << " [" << j << "]\n"; - for (unsigned k = 0; k < cube_batch[j].size(); ++k) { - std::cout << " [" << k << "] " << mk_pp(cube_batch[j][k].get(), m) << "\n"; - } - } - - return cube_batch; - }; - - // returns a vector of new cubes batches. each cube batch is a vector of expr_ref_vector cubes - vector> gen_new_batches(context& main_ctx) { - vector> cube_batches; - - // Get all cubes in the main context's manager - vector all_cubes = cube_batch_pq(main_ctx); - - ast_manager &m = main_ctx.get_manager(); - - // Partition into batches - for (unsigned start = 0; start < all_cubes.size(); start += batch_size) { - vector batch; - - unsigned end = std::min(start + batch_size, all_cubes.size()); - for (unsigned j = start; j < end; ++j) { - batch.push_back(all_cubes[j]); - } - - cube_batches.push_back(batch); - } - batch_idx = 0; // Reset index for next round - return cube_batches; - } - - void check_for_new_batches(context& main_ctx) { - std::lock_guard lock(mtx); - if (batch_idx >= batches.size()) { - batches = gen_new_batches(main_ctx); - } - } - }; - - BatchManager batch_manager(1); - - // Thread scheduling loop - while (true) { - vector threads(num_threads); - batch_manager.check_for_new_batches(ctx); - - // Launch threads - for (unsigned i = 0; i < num_threads; ++i) { - // [&, i] is the lambda's capture clause: capture all variables by reference (&) except i, which is captured by value. - threads[i] = std::thread([&, i]() { - while (!done) { - auto next_batch = batch_manager.get_next_batch(ctx.m, *pms[i]); - if (next_batch.empty()) break; // No more work - - worker_thread(i, next_batch); - } - }); - } - - // Wait for all threads to finish - for (auto & th : threads) { - th.join(); - } - - // Stop if one finished with a result - if (done) break; - - // Otherwise update shared state and retry - collect_units(); - ++num_rounds; - max_conflicts = (max_conflicts < thread_max_conflicts) ? 0 : (max_conflicts - thread_max_conflicts); - thread_max_conflicts *= 2; - } - // Gather statistics from all solver contexts for (context* c : pctxs) { c->collect_statistics(ctx.m_aux_stats); @@ -625,28 +318,7 @@ namespace smt { default: throw default_exception(std::move(ex_msg)); } } - - // Handle result: translate model/unsat core back to main context - // THIS SHOULD CO INSIDE THE PARALLEL::WORKER::RUN FUNCTION - model_ref mdl; - context& pctx = *pctxs[finished_id]; - ast_translation tr(*pms[finished_id], m); - switch (result) { - case l_true: - pctx.get_model(mdl); - if (mdl) - ctx.set_model(mdl->translate(tr)); - break; - case l_false: - ctx.m_unsat_core.reset(); - for (expr* e : pctx.unsat_core()) - ctx.m_unsat_core.push_back(tr(e)); - break; - default: - break; - } - - return result; + } } diff --git a/src/smt/smt_parallel.h b/src/smt/smt_parallel.h index 7bdea79e4..316213ad4 100644 --- a/src/smt/smt_parallel.h +++ b/src/smt/smt_parallel.h @@ -24,6 +24,7 @@ namespace smt { class parallel { context& ctx; + unsigned num_threads; class batch_manager { ast_manager& m; @@ -71,6 +72,7 @@ namespace smt { public: worker(parallel& p, context& _ctx, expr_ref_vector const& _asms); void run(); + expr_ref_vector get_split_atoms(); void cancel() { m.limit().cancel(); } @@ -88,7 +90,12 @@ namespace smt { lbool new_check(expr_ref_vector const& asms); public: - parallel(context& ctx): ctx(ctx), m_batch_manager(ctx.m, *this) {} + parallel(context& ctx) : + ctx(ctx), + num_threads(std::min( + (unsigned)std::thread::hardware_concurrency(), + ctx.get_fparams().m_threads)), + m_batch_manager(ctx.m, *this) {} lbool operator()(expr_ref_vector const& asms);