diff --git a/src/smt/smt_parallel.cpp b/src/smt/smt_parallel.cpp index 065958a08..db33cda14 100644 --- a/src/smt/smt_parallel.cpp +++ b/src/smt/smt_parallel.cpp @@ -49,8 +49,10 @@ namespace smt { if (cubes.empty()) return; for (auto& cube : cubes) { - if (!m.inc()) + if (!m.inc()) { + b.set_exception("context cancelled"); return; // stop if the main context is cancelled + } switch (check_cube(cube)) { case l_undef: { // return unprocessed cubes to the batch manager @@ -78,13 +80,13 @@ namespace smt { // If the unsat core only contains assumptions, // unsatisfiability does not depend on the current cube and the entire problem is unsat. if (all_of(unsat_core, [&](expr* e) { return asms.contains(e); })) { - std::cout << "Worker " << id << " determined formula unsat"; + IF_VERBOSE(0, verbose_stream() << "Worker " << id << " determined formula unsat"); b.set_unsat(l2g, unsat_core); return; } // TODO: can share lemmas here, such as new units and not(and(unsat_core)), binary clauses, etc. // TODO: remember assumptions used in core so that they get used for the final core. - std::cout << "Worker " << id << " found unsat cube: " << mk_pp(mk_and(cube), m) << "\n"; + IF_VERBOSE(0, verbose_stream() << "Worker " << id << " found unsat cube: " << mk_pp(mk_and(cube), m) << "\n"); b.share_lemma(l2g, mk_not(mk_and(unsat_core))); // share_units(); break; @@ -200,18 +202,18 @@ namespace smt { void parallel::batch_manager::set_sat(ast_translation& l2g, model& m) { std::scoped_lock lock(mux); - if (l_true == m_result) + if (m_state != state::is_running) return; - m_result = l_true; + m_state = state::is_sat; p.ctx.set_model(m.translate(l2g)); cancel_workers(); } void parallel::batch_manager::set_unsat(ast_translation& l2g, expr_ref_vector const& unsat_core) { std::scoped_lock lock(mux); - if (l_false == m_result) + if (m_state != state::is_running) return; - m_result = l_false; + m_state = state::is_unsat; p.ctx.m_unsat_core.reset(); for (expr* e : unsat_core) p.ctx.m_unsat_core.push_back(l2g(e)); @@ -220,43 +222,94 @@ namespace smt { void parallel::batch_manager::set_exception(unsigned error_code) { std::scoped_lock lock(mux); - if (m_exception_kind != NO_EX) - return; // already set - m_exception_kind = ERROR_CODE_EX; + if (m_state != state::is_running) + return; + m_state = state::is_exception_code; m_exception_code = error_code; cancel_workers(); } void parallel::batch_manager::set_exception(std::string const& msg) { std::scoped_lock lock(mux); - if (m_exception_kind != NO_EX) - return; // already set - m_exception_kind = ERROR_MSG_EX; + if (m_state != state::is_running || m.limit().is_canceled()) + return; + m_state = state::is_exception_msg; m_exception_msg = msg; cancel_workers(); } lbool parallel::batch_manager::get_result() const { - if (m_exception_kind == ERROR_MSG_EX) - throw default_exception(m_exception_msg.c_str()); - if (m_exception_kind == ERROR_CODE_EX) - throw z3_error(m_exception_code); if (m.limit().is_canceled()) return l_undef; // the main context was cancelled, so we return undef. - return m_result; + switch (m_state) { + case state::is_running: + if (!m_cubes.empty()) + throw default_exception("inconsistent end state"); + // TODO collect unsat core from assumptions, if any. + return l_false; + case state::is_unsat: + return l_false; + case state::is_sat: + return l_true; + case state::is_exception_msg: + throw default_exception(m_exception_msg.c_str()); + case state::is_exception_code: + throw z3_error(m_exception_code); + default: + UNREACHABLE(); + } } -#if 0 - for (auto& c : m_cubes) { - expr_ref_vector g_cube(l2g.to()); - for (auto& e : c) { - g_cube.push_back(l2g(e)); - } - share_lemma(l2g, mk_and(g_cube)); - } -#endif - - void parallel::batch_manager::return_cubes(ast_translation& l2g, vectorconst& cubes, expr_ref_vector const& split_atoms) { + // + // Batch manager maintains C_batch, A_batch. + // C_batch - set of cubes + // A_batch - set of split atoms. + // return_cubes is called with C_batch A_batch C A. + // C_worker - one or more cubes + // A_worker - split atoms form the worker thread. + // + // Assumption: A_worker does not occur in C_worker. + // + // Greedy strategy: + // + // return_cubes C_batch A_batch C_worker A_worker: + // C_batch <- { cube * 2^(A_worker u (A_batch \ atoms(cube)) | cube in C_worker } u + // { cube * 2^(A_worker \ A_batch) | cube in C_batch } + // = + // let C_batch' = C_batch u { cube * 2^(A_batch \ atoms(cube)) | cube in C_worker } + // { cube * 2^(A_worker \ A_batch) | cube in C_batch' } + // A_batch <- A_batch u A_worker + // + // Frugal strategy: + // + // return_cubes C_batch A_batch [[]] A_worker: + // C_batch <- C_batch u 2^(A_worker u A_batch), + // A_batch <- A_batch u A_worker + // + // return_cubes C_batch A_batch C_worker A_worker: + // C_batch <- C_batch u { cube * 2^A_worker | cube in C_worker }. + // A_batch <- A_batch u A_worker + // + // Between Frugal and Greedy: (generalizes the first case of empty cube returned by worker) + // C_batch <- C_batch u { cube * 2^(A_worker u (A_batch \ atoms(cube)) | cube in C_worker } + // A_batch <- A_batch u A_worker + // + // Or: use greedy strategy by a policy when C_batch, A_batch, A_worker are "small". + // + void parallel::batch_manager::return_cubes(ast_translation& l2g, vectorconst& cubes, expr_ref_vector const& a_worker) { + auto atom_in_cube = [&](expr_ref_vector const& cube, expr* atom) { + return any_of(cube, [&](expr* e) { return e == atom || (m.is_not(e, e) && e == atom); }); + }; + + auto add_split_atom = [&](expr* atom, unsigned start) { + unsigned stop = m_cubes.size(); + for (unsigned i = start; i < stop; ++i) { + m_cubes.push_back(m_cubes[i]); // push copy of m_cubes[i] + m_cubes.back().push_back(m.mk_not(atom)); // add ¬atom to the copy + m_cubes[i].push_back(atom); // add atom to the original + } + }; + std::scoped_lock lock(mux); for (auto & c : cubes) { expr_ref_vector g_cube(l2g.to()); @@ -264,34 +317,25 @@ namespace smt { g_cube.push_back(l2g(atom)); } + unsigned start = m_cubes.size(); m_cubes.push_back(g_cube); // base cube - expr_ref_vector& base = m_cubes.back(); for (auto& atom : m_split_atoms) { - if (g_cube.contains(atom) || g_cube.contains(m.mk_not(atom))) + if (atom_in_cube(g_cube, atom)) continue; - - // Split base: one copy with ¬atom, one with atom - m_cubes.push_back(base); // push new copy of base cube - m_cubes.back().push_back(m.mk_not(atom)); // add ¬atom to new copy - base.push_back(atom); // add atom to base cube + add_split_atom(atom, start); } } // TODO: avoid making m_cubes too large. // QUESTION: do we need to check if any split_atoms are already in the cubes in m_cubes?? - for (auto& atom : split_atoms) { + for (auto& atom : a_worker) { expr_ref g_atom(l2g.to()); g_atom = l2g(atom); if (m_split_atoms.contains(g_atom)) continue; m_split_atoms.push_back(g_atom); - unsigned sz = m_cubes.size(); - for (unsigned i = 0; i < sz; ++i) { - m_cubes.push_back(m_cubes[i]); // push copy of m_cubes[i] - m_cubes.back().push_back(m.mk_not(g_atom)); // add ¬p to the copy - m_cubes[i].push_back(g_atom); // add p to the original - } + add_split_atom(g_atom, 0); // add ¬p to all cubes in m_cubes } } @@ -318,15 +362,25 @@ namespace smt { return top_lits; } - lbool parallel::new_check(expr_ref_vector const& asms) { + void parallel::batch_manager::initialize() { + m_state = state::is_running; + m_cubes.reset(); + m_cubes.push_back(expr_ref_vector(m)); // push empty cube + m_split_atoms.reset(); + } + + lbool parallel::operator()(expr_ref_vector const& asms) { ast_manager& m = ctx.m; if (m.has_trace_stream()) throw default_exception("trace streams have to be off in parallel mode"); { + m_batch_manager.initialize(); + m_workers.reset(); scoped_limits sl(m.limit()); unsigned num_threads = std::min((unsigned)std::thread::hardware_concurrency(), ctx.get_fparams().m_threads); + flet _nt(ctx.m_fparams.m_threads, 1); SASSERT(num_threads > 1); for (unsigned i = 0; i < num_threads; ++i) m_workers.push_back(alloc(worker, i, *this, asms)); // i.e. "new worker(i, *this, asms)" @@ -356,12 +410,7 @@ namespace smt { m_workers.clear(); return m_batch_manager.get_result(); } - - lbool parallel::operator()(expr_ref_vector const& asms) { - std::cout << "Parallel solving with " << asms.size() << " assumptions." << std::endl; - flet _nt(ctx.m_fparams.m_threads, 1); - return new_check(asms); - } + } #endif diff --git a/src/smt/smt_parallel.h b/src/smt/smt_parallel.h index 0892d81e1..44dc1400e 100644 --- a/src/smt/smt_parallel.h +++ b/src/smt/smt_parallel.h @@ -29,19 +29,21 @@ namespace smt { class batch_manager { - enum exception_kind { - NO_EX, - ERROR_CODE_EX, - ERROR_MSG_EX + enum state { + is_running, + is_sat, + is_unsat, + is_exception_msg, + is_exception_code }; + ast_manager& m; parallel& p; std::mutex mux; + state m_state = state::is_running; expr_ref_vector m_split_atoms; // atoms to split on vector m_cubes; - lbool m_result = l_false; // want states: init/undef, canceled/exception, sat, unsat unsigned m_max_batch_size = 10; - exception_kind m_exception_kind = NO_EX; unsigned m_exception_code = 0; std::string m_exception_msg; @@ -52,7 +54,10 @@ namespace smt { } public: - batch_manager(ast_manager& m, parallel& p) : m(m), p(p), m_split_atoms(m) { m_cubes.push_back(expr_ref_vector(m)); } + batch_manager(ast_manager& m, parallel& p) : m(m), p(p), m_split_atoms(m) { } + + void initialize(); + void set_unsat(ast_translation& l2g, expr_ref_vector const& unsat_core); void set_sat(ast_translation& l2g, model& m); void set_exception(std::string const& msg); @@ -104,8 +109,6 @@ namespace smt { batch_manager m_batch_manager; ptr_vector m_workers; - lbool new_check(expr_ref_vector const& asms); - public: parallel(context& ctx) : ctx(ctx),