3
0
Fork 0
mirror of https://github.com/Z3Prover/z3 synced 2025-10-21 23:00:32 +00:00

Revert "Parallel solving (#7775)" (#7777)

This reverts commit c8e866f568.
This commit is contained in:
Nikolaj Bjorner 2025-08-14 18:16:35 -07:00 committed by GitHub
parent 1e7832a391
commit e24a5b6624
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 224 additions and 1054 deletions

View file

@ -1,218 +0,0 @@
# Parallel project notes
We track notes for updates to
[smt/parallel.cpp](https://github.com/Z3Prover/z3/blob/master/src/smt/smt_parallel.cpp)
and possibly
[solver/parallel_tactic.cpp](https://github.com/Z3Prover/z3/blob/master/src/solver/parallel_tactical.cpp).
## Variable selection heuristics
* Lookahead solvers:
* lookahead in the smt directory performs a simplistic lookahead search using unit propagation.
* lookahead in the sat directory uses custom lookahead solver based on MARCH. March is described in Handbook of SAT and Knuth volumne 4.
* They both proxy on a cost model where the most useful variable to branch on is the one that _minimizes_ the set of new clauses maximally
through unit propagation. In other words, if a literal _p_ is set to true, and _p_ occurs in clause $\neg p \vee q \vee r$, then it results in
reducing the clause from size 3 to 2 (because $\neg p$ will be false after propagating _p_).
* Selected references: SAT handbook, Knuth Volumne 4, Marijn's March solver on github, [implementation of march in z3](https://github.com/Z3Prover/z3/blob/master/src/sat/sat_lookahead.cpp)
* VSIDS:
* As referenced in Matteo and Antti's solvers.
* Variable activity is a proxy for how useful it is to case split on a variable during search. Variables with a higher VSIDS are split first.
* VSIDS is updated dynamically during search. It was introduced in the paper with Moscovitz, Malik, et al in early 2000s. A good overview is in Armin's tutorial slides (also in my overview of SMT).
* VSIDS does not keep track of variable phases (if the variable was set to true or false).
* Selected refernces [DAC 2001](https://www.princeton.edu/~chaff/publication/DAC2001v56.pdf) and [Biere Tutorial, slide 64 on Variable Scoring Schemes](https://alexeyignatiev.github.io/ssa-school-2019/slides/ab-satsmtar19-slides.pdf)
* Proof prefix:
* Collect the literals that occur in learned clauses. Count their occurrences based on polarity. This gets tracked in a weighted score.
* The weight function can be formulated to take into account clause sizes.
* The score assignment may also decay similar to VSIDS.
* We could also use a doubly linked list for literals used in conflicts and keep reinsert literals into the list when they are used. This would be a "Variable move to front" (VMTF) variant.
* Selected references: [Battleman et al](https://www.cs.cmu.edu/~mheule/publications/proofix-SAT25.pdf)
* From local search:
* Note also that local search solvers can be used to assign variable branch priorities.
* We are not going to directly run a local search solver in the mix up front, but let us consider this heuristic for completeness.
* The heuristic is documented in Biere and Cai's journal paper on integrating local search for CDCL.
* Roughly, it considers clauses that move from the UNSAT set to the SAT set of clauses. It then keeps track of the literals involved.
* Selected references: [Cai et al](https://www.jair.org/index.php/jair/article/download/13666/26833/)
* Assignment trails:
* We could also consider the assignments to variables during search.
* Variables that are always assigned to the same truth value could be considered to be safe to assign that truth value.
* The cubes resulting from such variables might be a direction towards finding satisfying solutions.
* Selected references: [Alex and Vadim](https://link.springer.com/chapter/10.1007/978-3-319-94144-8_7) and most recently [Robin et al](https://drops.dagstuhl.de/entities/document/10.4230/LIPIcs.SAT.2024.9).
## Algorithms
This section considers various possible algorithms.
In the following, $F$ refers to the original goal, $T$ is the number of CPU cores or CPU threads.
### Base algorithm
The existing algorithm in <b>smt_parallel</b> is as follows:
1. Run a solver on $F$ with a bounded number of conflicts.
2. If the result is SAT/UNSAT, or UNKNOWN with an interrupt or timeout, return. If the maximal number of conflicts were reached continue.
3. Spawn $T$ solvers on $F$ with a bounded number of conflicts, wait until a thread returns UNSAT/SAT or all threads have reached a maximal number of conflicts.
4. Perform a similar check as in 2.
5. Share unit literals learned by each thread.
6. Compute unit cubes for each thread $T$.
7. Spawn $T$ solvers with $F \wedge \ell$, where $\ell$ is a unit literal determined by lookahead function in each thread.
8. Perform a similar check as in 2. But note that a thread can be UNSAT because the unit cube $\ell$ contradicted $F$. In this case learn the unit literal $\neg \ell$.
9. Shared unit literals learned by each thread, increase the maximal number of conflicts, go to 3.
### Algorithm Variants
* Instead of using lookahead solving to find unit cubes use the proof-prefix based scoring function.
* Instead of using independent unit cubes, perform a systematic (where systematic can mean many things) cube and conquer strategy.
* Spawn some threads to work in "SAT" mode, tuning to find models instead of short resolution proofs.
* Change the synchronization barrier discipline.
* [Future] Include in-processing
### Cube and Conquer strategy
We could maintain a global decomposition of the search space by maintaing a list of _cubes_.
Initially, the list of cubes has just one element, the cube with no literals $[ [] ]$.
By using a list of cubes instead of a _set_ of cubes we can refer to an ordering.
For example, cubes can be ordered by a suffix traversal of the _cube tree_ (the tree formed by
case splitting on the first literal, children of the _true_ branch are the cubes where the first
literal is true, children of the _false_ branch are the cubes where the first literal is false).
The main question is going to be how the cube decomposition is created.
#### Static cubing
We can aim for a static cube strategy that uses a few initial (concurrent) probes to find cube literals.
This strategy would be a parallel implementaiton of proof-prefix approach. The computed cubes are inserted
into the list of cubes and the list is consumed by a second round.
#### Growing cubes on demand
Based on experiences with cubing so far, there is high variance in how easy cubes are to solve.
Some cubes will be harder than others to solve. For hard cubes, it is tempting to develop a recursive
cubing strategy. Ideally, a recursive cubing strategy is symmetric to top-level cubing.
* The solver would have to identify hard cubes vs. easy cubes.
* It would have to know when to stop working on a hard cube and replace it in the list of cubes by
a new list of sub-cubes.
* Ideally, we don't need any static cubing and cubing is grown on demand while all threads are utilized.
* If we spawn $T$ threads to initially work with empty cubes, we could extract up to $T$ indepenent cubes
by examining the proof-prefix of their traces. This can form the basis for the first, up to $2^T$ cubes.
* After a round of solving with each thread churning on some cubes, we may obtain more proof-prefixes from
_hard_ cubes. It is not obvious that we want to share cubes from different proof prefixes at this point.
But a starting point is to split a hard cube into two by using the proof-prefix from attempting to solve it.
* Suppose we take the proof-prefix sampling algorithm at heart: It says to start with some initial cube prefix
and then sample for other cube literals. If we translate it to the case where multiple cubes are being processed
in parallel, then an analogy is to share candidates for new cube literals among cubes that are close to each-other.
For example, if thread $t_1$ processes cube $a, b, c$ and $t_2$ processes $a,b, \neg c$. They are close. They are only
separated by Hamming distance 1. If $t_1$ finds cube literal $d$ and $t_2$ finds cube literal $e$, we could consider the cubes
$a, b, c, d, e$, and $a, b, c, d, \neg e$, $\ldots$, $a, b, \neg c, \neg d, \neg e$.
#### Representing cubes implicitly
We can represent a list of cubes by using intervals and only represent start and end-points of the intervals.
#### Batching
Threads can work on more than one cube in a batch.
### Synchronization
* The first thread to time out or finish could kill other threads instead of joining on all threads to finish.
* Instead of synchronization barriers have threads continue concurrently without terminating. They synchronize on signals and new units. This is trickier to implement, but in some guises accomplished in [sat/sat_parallel.cpp](https://github.com/Z3Prover/z3/blob/master/src/sat/sat_parallel.cpp)
## Parameter tuning
The idea is to have parallel threads try out different parameter settings and search the parameter space of an optimal parameter setting.
Let us assume that there is a set of tunable parameters $P$. The set comprises of a set of named parameters with initial values.
$P = \{ (p_1, v_1), \ldots, (p_n, v_n) \}$.
With each parameter associate a set of mutation functions $+=, -=, *=$, such as increment, decrement, scale a parameter by a non-negative multiplier (which can be less than 1).
We will initialize a search space of parameter settings by parameters, values and mutation functions that have assigned reward values. The reward value is incremented
if a parameter mutation step results in an improvement, and decremented if a mutation step degrades performance.
$P = \{ (p_1, v_1, \{ (r_{11}, m_{11}), \ldots, (r_{1k_1}, m_{1k_1}) \}), \ldots, (p_n, v_n, \{ (r_{n1}, m_{n1}), \ldots, (r_{nk_n}, m_{nk_n})\}) \}$.
The initial values of reward functions is fixed (to 1) and the initial values of parameters are the defaults.
* The batch manager maintains a set of candidate parameters $CP = \{ (P_1, r_1), \ldots, (P_n, r_n) \}$.
* A worker thread picks up a parameter $P_i$ from $CP$ from the batch manager.
* It picks one or more parameter settings within $P_i$ whose mutation function have non-zero reward functions and applies a mutation.
* It then runs with a batch of cubes.
* It measures the reward for the new parameter setting based in number of cubes, cube depth, number of timeouts, and completions with number of conflicts.
* If the new reward is an improvement over $(P_i, r_i)$ it inserts the new parameter setting $(P_i', r_i')$ into the batch manager.
* The batch manager discards the worst parameter settings keeping the top $K$ ($K = 5$) parameter settings.
When picking among mutation steps with reward functions use a weighted sampling algorithm.
Weighted sampling works as follows: You are given a set of items with weights $(i_1, w_1), \ldots, (i_k, w_k)$.
Add $w = \sum_j w_j$. Pick a random number $w_0$ in the range $0\ldots w$.
Then you pick item $i_n$ such that $n$ is the smallest index with $\sum_{j = 1}^n w_j \geq w_0$.
SMT parameters that could be tuned:
<pre>
arith.bprop_on_pivoted_rows (bool) (default: true)
arith.branch_cut_ratio (unsigned int) (default: 2)
arith.eager_eq_axioms (bool) (default: true)
arith.enable_hnf (bool) (default: true)
arith.greatest_error_pivot (bool) (default: false)
arith.int_eq_branch (bool) (default: false)
arith.min (bool) (default: false)
arith.nl.branching (bool) (default: true)
arith.nl.cross_nested (bool) (default: true)
arith.nl.delay (unsigned int) (default: 10)
arith.nl.expensive_patching (bool) (default: false)
arith.nl.expp (bool) (default: false)
arith.nl.gr_q (unsigned int) (default: 10)
arith.nl.grobner (bool) (default: true)
arith.nl.grobner_cnfl_to_report (unsigned int) (default: 1)
arith.nl.grobner_eqs_growth (unsigned int) (default: 10)
arith.nl.grobner_expr_degree_growth (unsigned int) (default: 2)
arith.nl.grobner_expr_size_growth (unsigned int) (default: 2)
arith.nl.grobner_frequency (unsigned int) (default: 4)
arith.nl.grobner_max_simplified (unsigned int) (default: 10000)
arith.nl.grobner_row_length_limit (unsigned int) (default: 10)
arith.nl.grobner_subs_fixed (unsigned int) (default: 1)
arith.nl.horner (bool) (default: true)
arith.nl.horner_frequency (unsigned int) (default: 4)
arith.nl.horner_row_length_limit (unsigned int) (default: 10)
arith.nl.horner_subs_fixed (unsigned int) (default: 2)
arith.nl.nra (bool) (default: true)
arith.nl.optimize_bounds (bool) (default: true)
arith.nl.order (bool) (default: true)
arith.nl.propagate_linear_monomials (bool) (default: true)
arith.nl.rounds (unsigned int) (default: 1024)
arith.nl.tangents (bool) (default: true)
arith.propagate_eqs (bool) (default: true)
arith.propagation_mode (unsigned int) (default: 1)
arith.random_initial_value (bool) (default: false)
arith.rep_freq (unsigned int) (default: 0)
arith.simplex_strategy (unsigned int) (default: 0)
dack (unsigned int) (default: 1)
dack.eq (bool) (default: false)
dack.factor (double) (default: 0.1)
dack.gc (unsigned int) (default: 2000)
dack.gc_inv_decay (double) (default: 0.8)
dack.threshold (unsigned int) (default: 10)
delay_units (bool) (default: false)
delay_units_threshold (unsigned int) (default: 32)
dt_lazy_splits (unsigned int) (default: 1)
lemma_gc_strategy (unsigned int) (default: 0)
phase_caching_off (unsigned int) (default: 100)
phase_caching_on (unsigned int) (default: 400)
phase_selection (unsigned int) (default: 3)
qi.eager_threshold (double) (default: 10.0)
qi.lazy_threshold (double) (default: 20.0)
qi.quick_checker (unsigned int) (default: 0)
relevancy (unsigned int) (default: 2)
restart_factor (double) (default: 1.1)
restart_strategy (unsigned int) (default: 1)
seq.max_unfolding (unsigned int) (default: 1000000000)
seq.min_unfolding (unsigned int) (default: 1)
seq.split_w_len (bool) (default: true)
</pre>

View file

@ -1,32 +0,0 @@
#!/bin/bash
# run from inside ./z3/build
Z3=./z3
OPTIONS="-v:0 -st smt.threads=4"
OUT_FILE="../z3_results.txt"
BASE_PATH="../../z3-poly-testing/inputs/"
# List of relative test files (relative to BASE_PATH)
REL_TEST_FILES=(
"QF_NIA_small/Ton_Chanh_15__Singapore_v1_false-termination.c__p27381_terminationG_0.smt2"
"QF_UFDTLIA_SAT/52759_bec3a2272267494faeecb6bfaf253e3b_10_QF_UFDTLIA.smt2"
)
# Clear output file
> "$OUT_FILE"
# Loop through and run Z3 on each file
for rel_path in "${REL_TEST_FILES[@]}"; do
full_path="$BASE_PATH$rel_path"
test_name="$rel_path"
echo "Running: $test_name"
echo "===== $test_name =====" | tee -a "$OUT_FILE"
# Run Z3 and pipe output to both screen and file
$Z3 "$full_path" $OPTIONS 2>&1 | tee -a "$OUT_FILE"
echo "" | tee -a "$OUT_FILE"
done
echo "Results written to $OUT_FILE"

View file

@ -5153,8 +5153,6 @@ namespace polynomial {
//
unsigned sz = R->size();
for (unsigned i = 0; i < sz; i++) {
if (sz > 100 && i % 100 == 0)
checkpoint();
monomial * m = R->m(i);
numeral const & a = R->a(i);
if (m->degree_of(x) == deg_R) {
@ -5573,7 +5571,6 @@ namespace polynomial {
h = mk_one();
while (true) {
checkpoint();
TRACE(resultant, tout << "A: " << A << "\nB: " << B << "\n";);
degA = degree(A, x);
degB = degree(B, x);

View file

@ -1,191 +0,0 @@
// SOURCE: https://github.com/Ten0/updatable_priority_queue/blob/master/updatable_priority_queue.h
#include <utility>
#include <vector>
namespace updatable_priority_queue {
template <typename Key, typename Priority>
struct priority_queue_node {
Priority priority;
Key key;
priority_queue_node(const Key& key, const Priority& priority) : priority(priority), key(key) {}
friend bool operator<(const priority_queue_node& pqn1, const priority_queue_node& pqn2) {
return pqn1.priority > pqn2.priority;
}
friend bool operator>(const priority_queue_node& pqn1, const priority_queue_node& pqn2) {
return pqn1.priority < pqn2.priority;
}
};
/** Key has to be an uint value (convertible to size_t)
* This is a max heap (max is on top), to match stl's pQ */
template <typename Key, typename Priority>
class priority_queue {
protected:
std::vector<size_t> id_to_heappos;
std::vector<priority_queue_node<Key,Priority>> heap;
std::size_t max_size = 4; // std::numeric_limits<std::size_t>::max(); // Create a variable max_size that defaults to the largest size_t value possible
public:
// priority_queue() {}
priority_queue(std::size_t max_size = std::numeric_limits<std::size_t>::max()): max_size(max_size) {}
// Returns a const reference to the internal heap storage
const std::vector<priority_queue_node<Key, Priority>>& get_heap() const {
return heap;
}
bool empty() const { return heap.empty(); }
std::size_t size() const { return heap.size(); }
/** first is priority, second is key */
const priority_queue_node<Key,Priority>& top() const { return heap.front(); }
void pop(bool remember_key=false) {
if(size() == 0) return;
id_to_heappos[heap.front().key] = -1-remember_key;
if(size() > 1) {
*heap.begin() = std::move(*(heap.end()-1));
id_to_heappos[heap.front().key] = 0;
}
heap.pop_back();
sift_down(0);
}
priority_queue_node<Key,Priority> pop_value(bool remember_key=true) {
if(size() == 0) return priority_queue_node<Key, Priority>(-1, Priority());
priority_queue_node<Key,Priority> ret = std::move(*heap.begin());
id_to_heappos[ret.key] = -1-remember_key;
if(size() > 1) {
*heap.begin() = std::move(*(heap.end()-1));
id_to_heappos[heap.front().key] = 0;
}
heap.pop_back();
sift_down(0);
return ret;
}
/** Sets the priority for the given key. If not present, it will be added, otherwise it will be updated
* Returns true if the priority was changed.
* */
bool set(const Key& key, const Priority& priority, bool only_if_higher=false) {
if(key < id_to_heappos.size() && id_to_heappos[key] < ((size_t)-2)) // This key is already in the pQ
return update(key, priority, only_if_higher);
else
return push(key, priority, only_if_higher);
}
std::pair<bool,Priority> get_priority(const Key& key) {
if(key < id_to_heappos.size()) {
size_t pos = id_to_heappos[key];
if(pos < ((size_t)-2)) {
return {true, heap[pos].priority};
}
}
return {false, 0};
}
/** Returns true if the key was not inside and was added, otherwise does nothing and returns false
* If the key was remembered and only_if_unknown is true, does nothing and returns false
* */
bool push(const Key& key, const Priority& priority, bool only_if_unknown = false) {
extend_ids(key);
if (id_to_heappos[key] < ((size_t)-2)) return false; // already inside
if (only_if_unknown && id_to_heappos[key] == ((size_t)-2)) return false; // was evicted and only_if_unknown prevents re-adding
if (heap.size() < max_size) {
// We have room: just add new element
size_t n = heap.size();
id_to_heappos[key] = n;
heap.emplace_back(key, priority);
sift_up(n);
return true;
} else {
// Heap full: heap[0] is the smallest priority in the top-k (min-heap)
if (priority <= heap[0].priority) {
// New element priority too small or equal, discard it
return false;
}
// Evict smallest element at heap[0]
Key evicted_key = heap[0].key;
id_to_heappos[evicted_key] = -2; // Mark evicted
heap[0] = priority_queue_node<Key, Priority>(key, priority);
id_to_heappos[key] = 0;
sift_down(0); // restore min-heap property
return true;
}
}
/** Returns true if the key was already inside and was updated, otherwise does nothing and returns false */
bool update(const Key& key, const Priority& new_priority, bool only_if_higher=false) {
if(key >= id_to_heappos.size()) return false;
size_t heappos = id_to_heappos[key];
if(heappos >= ((size_t)-2)) return false;
Priority& priority = heap[heappos].priority;
if(new_priority > priority) {
priority = new_priority;
sift_up(heappos);
return true;
}
else if(!only_if_higher && new_priority < priority) {
priority = new_priority;
sift_down(heappos);
return true;
}
return false;
}
void clear() {
heap.clear();
id_to_heappos.clear();
}
private:
void extend_ids(Key k) {
size_t new_size = k+1;
if(id_to_heappos.size() < new_size)
id_to_heappos.resize(new_size, -1);
}
void sift_down(size_t heappos) {
size_t len = heap.size();
size_t child = heappos*2+1;
if(len < 2 || child >= len) return;
if(child+1 < len && heap[child+1] > heap[child]) ++child; // Check whether second child is higher
if(!(heap[child] > heap[heappos])) return; // Already in heap order
priority_queue_node<Key,Priority> val = std::move(heap[heappos]);
do {
heap[heappos] = std::move(heap[child]);
id_to_heappos[heap[heappos].key] = heappos;
heappos = child;
child = 2*child+1;
if(child >= len) break;
if(child+1 < len && heap[child+1] > heap[child]) ++child;
} while(heap[child] > val);
heap[heappos] = std::move(val);
id_to_heappos[heap[heappos].key] = heappos;
}
void sift_up(size_t heappos) {
size_t len = heap.size();
if(len < 2 || heappos <= 0) return;
size_t parent = (heappos-1)/2;
if(!(heap[heappos] > heap[parent])) return;
priority_queue_node<Key, Priority> val = std::move(heap[heappos]);
do {
heap[heappos] = std::move(heap[parent]);
id_to_heappos[heap[heappos].key] = heappos;
heappos = parent;
if(heappos <= 0) break;
parent = (parent-1)/2;
} while(val > heap[parent]);
heap[heappos] = std::move(val);
id_to_heappos[heap[heappos].key] = heappos;
}
};
}

View file

@ -50,8 +50,6 @@ Revision History:
#include "model/model.h"
#include "solver/progress_callback.h"
#include "solver/assertions/asserted_formulas.h"
#include "smt/priority_queue.h"
#include "util/dlist.h"
#include <tuple>
// there is a significant space overhead with allocating 1000+ contexts in
@ -191,17 +189,6 @@ namespace smt {
unsigned_vector m_lit_occs; //!< occurrence count of literals
svector<bool_var_data> m_bdata; //!< mapping bool_var -> data
svector<double> m_activity;
updatable_priority_queue::priority_queue<bool_var, double> m_pq_scores;
struct lit_node : dll_base<lit_node> {
literal lit;
lit_node(literal l) : lit(l) { init(this); }
};
lit_node* m_dll_lits;
// svector<std::array<double, 2>> m_lit_scores;
svector<double> m_lit_scores[2];
clause_vector m_aux_clauses;
clause_vector m_lemmas;
vector<clause_vector> m_clauses_to_reinit;
@ -946,17 +933,6 @@ namespace smt {
ast_pp_util m_lemma_visitor;
void dump_lemma(unsigned n, literal const* lits);
void dump_axiom(unsigned n, literal const* lits);
void add_scores(unsigned n, literal const* lits);
void reset_scores() {
for (auto& e : m_lit_scores[0])
e = 0;
for (auto& e : m_lit_scores[1])
e = 0;
m_pq_scores.clear(); // Clear the priority queue heap as well
}
double get_score(literal l) const {
return m_lit_scores[l.sign()][l.var()];
}
public:
void ensure_internalized(expr* e);

View file

@ -931,10 +931,6 @@ namespace smt {
set_bool_var(id, v);
m_bdata.reserve(v+1);
m_activity.reserve(v+1);
m_lit_scores[0].reserve(v + 1);
m_lit_scores[1].reserve(v + 1);
m_lit_scores[0][v] = m_lit_scores[1][v] = 0.0;
m_bool_var2expr.reserve(v+1);
m_bool_var2expr[v] = n;
literal l(v, false);
@ -1423,7 +1419,6 @@ namespace smt {
break;
case CLS_LEARNED:
dump_lemma(num_lits, lits);
add_scores(num_lits, lits);
break;
default:
break;
@ -1532,27 +1527,6 @@ namespace smt {
}}
}
// void context::add_scores(unsigned n, literal const* lits) {
// for (unsigned i = 0; i < n; ++i) {
// auto lit = lits[i];
// unsigned v = lit.var();
// m_lit_scores[v][lit.sign()] += 1.0 / n;
// }
// }
void context::add_scores(unsigned n, literal const* lits) {
for (unsigned i = 0; i < n; ++i) {
auto lit = lits[i];
unsigned v = lit.var(); // unique key per literal
m_lit_scores[lit.sign()][v] += 1.0 / n;
auto new_score = m_lit_scores[0][v] * m_lit_scores[1][v];
m_pq_scores.set(v, new_score);
}
}
void context::dump_axiom(unsigned n, literal const* lits) {
if (m_fparams.m_axioms2files) {
literal_buffer tmp;

View file

@ -30,13 +30,11 @@ namespace smt {
struct compare;
// double get_score();
double get_score();
void choose_rec(expr_ref_vector& trail, expr_ref_vector& result, unsigned depth, unsigned budget);
public:
double get_score();
lookahead(context& ctx);
expr_ref choose(unsigned budget = 2000);

View file

@ -36,463 +36,237 @@ namespace smt {
#else
#include <thread>
#include <cassert>
namespace smt {
void parallel::worker::run() {
ast_translation g2l(p.ctx.m, m); // global to local context -- MUST USE p.ctx.m, not ctx->m, AS GLOBAL MANAGER!!!
ast_translation l2g(m, p.ctx.m); // local to global context
while (m.inc()) { // inc: increase the limit and check if it is canceled, vs m.limit().is_canceled() is readonly. the .limit() is also not necessary (m.inc() etc provides a convenience wrapper)
vector<expr_ref_vector> cubes;
b.get_cubes(g2l, cubes);
if (cubes.empty())
return;
collect_shared_clauses(g2l);
for (auto& cube : cubes) {
if (!m.inc()) {
b.set_exception("context cancelled");
return;
}
IF_VERBOSE(1, verbose_stream() << "Worker " << id << " cube: " << mk_bounded_pp(mk_and(cube), m, 3) << "\n");
lbool r = check_cube(cube);
if (m.limit().is_canceled()) {
IF_VERBOSE(1, verbose_stream() << "Worker " << id << " context cancelled\n");
return;
}
switch (r) {
case l_undef: {
IF_VERBOSE(1, verbose_stream() << "Worker " << id << " found undef cube\n");
// return unprocessed cubes to the batch manager
// add a split literal to the batch manager.
// optionally process other cubes and delay sending back unprocessed cubes to batch manager.
vector<expr_ref_vector> returned_cubes;
returned_cubes.push_back(cube);
auto split_atoms = get_split_atoms();
b.return_cubes(l2g, returned_cubes, split_atoms);
update_max_thread_conflicts();
break;
}
case l_true: {
IF_VERBOSE(1, verbose_stream() << "Worker " << id << " found sat cube\n");
model_ref mdl;
ctx->get_model(mdl);
b.set_sat(l2g, *mdl);
return;
}
case l_false: {
// if unsat core only contains (external) assumptions (i.e. all the unsat core are asms), then unsat and return as this does NOT depend on cubes
// otherwise, extract lemmas that can be shared (units (and unsat core?)).
// share with batch manager.
// process next cube.
expr_ref_vector const& unsat_core = ctx->unsat_core();
IF_VERBOSE(1, verbose_stream() << "unsat core: " << unsat_core << "\n");
// If the unsat core only contains assumptions,
// unsatisfiability does not depend on the current cube and the entire problem is unsat.
if (all_of(unsat_core, [&](expr* e) { return asms.contains(e); })) {
IF_VERBOSE(1, verbose_stream() << "Worker " << id << " determined formula unsat\n");
b.set_unsat(l2g, unsat_core);
return;
}
for (expr* e : unsat_core)
if (asms.contains(e))
b.report_assumption_used(l2g, e); // report assumptions used in unsat core, so they can be used in final core
IF_VERBOSE(1, verbose_stream() << "Worker " << id << " found unsat cube\n");
b.collect_clause(l2g, id, mk_not(mk_and(unsat_core)));
break;
}
}
}
share_units(l2g);
}
}
parallel::worker::worker(unsigned id, parallel& p, expr_ref_vector const& _asms): id(id), p(p), b(p.m_batch_manager), m_smt_params(p.ctx.get_fparams()), asms(m) {
ast_translation g2l(p.ctx.m, m);
for (auto e : _asms)
asms.push_back(g2l(e));
IF_VERBOSE(1, verbose_stream() << "Worker " << id << " created with " << asms.size() << " assumptions\n");
m_smt_params.m_preprocess = false;
ctx = alloc(context, m, m_smt_params, p.ctx.get_params());
context::copy(p.ctx, *ctx, true);
ctx->set_random_seed(id + m_smt_params.m_random_seed);
m_max_thread_conflicts = ctx->get_fparams().m_threads_max_conflicts;
m_max_conflicts = ctx->get_fparams().m_max_conflicts;
}
void parallel::worker::share_units(ast_translation& l2g) {
// Collect new units learned locally by this worker and send to batch manager
unsigned sz = ctx->assigned_literals().size();
for (unsigned j = m_num_shared_units; j < sz; ++j) { // iterate only over new literals since last sync
literal lit = ctx->assigned_literals()[j];
expr_ref e(ctx->bool_var2expr(lit.var()), ctx->m); // turn literal into a Boolean expression
if (lit.sign())
e = m.mk_not(e); // negate if literal is negative
b.collect_clause(l2g, id, e);
}
m_num_shared_units = sz;
}
void parallel::batch_manager::collect_clause(ast_translation& l2g, unsigned source_worker_id, expr* clause) {
std::scoped_lock lock(mux);
expr* g_clause = l2g(clause);
if (!shared_clause_set.contains(g_clause)) {
shared_clause_set.insert(g_clause);
shared_clause sc{source_worker_id, expr_ref(g_clause, m)};
shared_clause_trail.push_back(sc);
}
}
void parallel::worker::collect_shared_clauses(ast_translation& g2l) {
expr_ref_vector new_clauses = b.return_shared_clauses(g2l, m_shared_clause_limit, id); // get new clauses from the batch manager
// iterate over new clauses and assert them in the local context
for (expr* e : new_clauses) {
expr_ref local_clause(e, g2l.to()); // e was already translated to the local context in the batch manager!!
ctx->assert_expr(local_clause); // assert the clause in the local context
IF_VERBOSE(1, verbose_stream() << "Worker " << id << " asserting shared clause: " << mk_bounded_pp(local_clause, m, 3) << "\n");
}
}
// get new clauses from the batch manager and assert them in the local context
expr_ref_vector parallel::batch_manager::return_shared_clauses(ast_translation& g2l, unsigned& worker_limit, unsigned worker_id) {
std::scoped_lock lock(mux);
expr_ref_vector result(g2l.to());
for (unsigned i = worker_limit; i < shared_clause_trail.size(); ++i) {
if (shared_clause_trail[i].source_worker_id == worker_id)
continue; // skip clauses from the requesting worker
result.push_back(g2l(shared_clause_trail[i].clause.get()));
}
worker_limit = shared_clause_trail.size(); // update the worker limit to the end of the current trail
return result;
}
lbool parallel::worker::check_cube(expr_ref_vector const& cube) {
IF_VERBOSE(1, verbose_stream() << "Worker " << id << " checking cube\n";);
for (auto& atom : cube)
asms.push_back(atom);
lbool r = l_undef;
ctx->get_fparams().m_max_conflicts = std::min(m_max_thread_conflicts, m_max_conflicts);
try {
r = ctx->check(asms.size(), asms.data());
}
catch (z3_error& err) {
b.set_exception(err.error_code());
}
catch (z3_exception& ex) {
b.set_exception(ex.what());
}
catch (...) {
b.set_exception("unknown exception");
}
asms.shrink(asms.size() - cube.size());
IF_VERBOSE(1, verbose_stream() << "Worker " << id << " DONE checking cube " << r << "\n";);
return r;
}
void parallel::batch_manager::get_cubes(ast_translation& g2l, vector<expr_ref_vector>& cubes) {
std::scoped_lock lock(mux);
if (m_cubes.size() == 1 && m_cubes[0].size() == 0) {
// special initialization: the first cube is emtpy, have the worker work on an empty cube.
cubes.push_back(expr_ref_vector(g2l.to()));
return;
}
for (unsigned i = 0; i < std::min(m_max_batch_size / p.num_threads, (unsigned)m_cubes.size()) && !m_cubes.empty(); ++i) {
auto& cube = m_cubes.back();
expr_ref_vector l_cube(g2l.to());
for (auto& e : cube) {
l_cube.push_back(g2l(e));
}
cubes.push_back(l_cube);
m_cubes.pop_back();
}
}
void parallel::batch_manager::set_sat(ast_translation& l2g, model& m) {
std::scoped_lock lock(mux);
if (m_state != state::is_running)
return;
m_state = state::is_sat;
p.ctx.set_model(m.translate(l2g));
cancel_workers();
}
void parallel::batch_manager::set_unsat(ast_translation& l2g, expr_ref_vector const& unsat_core) {
std::scoped_lock lock(mux);
if (m_state != state::is_running)
return;
m_state = state::is_unsat;
// every time we do a check_sat call, don't want to have old info coming from a prev check_sat call
// the unsat core gets reset internally in the context after each check_sat, so we assert this property here
// takeaway: each call to check_sat needs to have a fresh unsat core
SASSERT(p.ctx.m_unsat_core.empty());
for (expr* e : unsat_core)
p.ctx.m_unsat_core.push_back(l2g(e));
cancel_workers();
}
void parallel::batch_manager::set_exception(unsigned error_code) {
std::scoped_lock lock(mux);
if (m_state != state::is_running)
return;
m_state = state::is_exception_code;
m_exception_code = error_code;
cancel_workers();
}
void parallel::batch_manager::set_exception(std::string const& msg) {
std::scoped_lock lock(mux);
if (m_state != state::is_running || m.limit().is_canceled())
return;
m_state = state::is_exception_msg;
m_exception_msg = msg;
cancel_workers();
}
void parallel::batch_manager::report_assumption_used(ast_translation& l2g, expr* assumption) {
std::scoped_lock lock(mux);
p.m_assumptions_used.insert(l2g(assumption));
}
lbool parallel::batch_manager::get_result() const {
if (m.limit().is_canceled())
return l_undef; // the main context was cancelled, so we return undef.
switch (m_state) {
case state::is_running: // batch manager is still running, but all threads have processed their cubes, which means all cubes were unsat
if (!m_cubes.empty())
throw default_exception("inconsistent end state");
if (!p.m_assumptions_used.empty()) {
// collect unsat core from assumptions used, if any --> case when all cubes were unsat, but depend on nonempty asms, so we need to add these asms to final unsat core
SASSERT(p.ctx.m_unsat_core.empty());
for (auto a : p.m_assumptions_used)
p.ctx.m_unsat_core.push_back(a);
}
return l_false;
case state::is_unsat:
return l_false;
case state::is_sat:
return l_true;
case state::is_exception_msg:
throw default_exception(m_exception_msg.c_str());
case state::is_exception_code:
throw z3_error(m_exception_code);
default:
UNREACHABLE();
return l_undef;
}
}
/*
Batch manager maintains C_batch, A_batch.
C_batch - set of cubes
A_batch - set of split atoms.
return_cubes is called with C_batch A_batch C A.
C_worker - one or more cubes
A_worker - split atoms form the worker thread.
Assumption: A_worker does not occur in C_worker.
------------------------------------------------------------------------------------------------------------------------------------------------------------
Greedy strategy:
For each returned cube c from the worker, you split it on all split atoms not in it (i.e., A_batch \ atoms(c)), plus any new atoms from A_worker.
For each existing cube in the batch, you also split it on the new atoms from A_worker.
return_cubes C_batch A_batch C_worker A_worker:
C_batch <- { cube * 2^(A_worker u (A_batch \ atoms(cube)) | cube in C_worker } u
{ cube * 2^(A_worker \ A_batch) | cube in C_batch }
=
let C_batch' = C_batch u { cube * 2^(A_batch \ atoms(cube)) | cube in C_worker }
in { cube * 2^(A_worker \ A_batch) | cube in C_batch' }
A_batch <- A_batch u A_worker
------------------------------------------------------------------------------------------------------------------------------------------------------------
Frugal strategy: only split on worker cubes
case 1: thread returns no cubes, just atoms: just create 2^k cubes from all combinations of atoms so far.
return_cubes C_batch A_batch [[]] A_worker:
C_batch <- C_batch u 2^(A_worker u A_batch),
A_batch <- A_batch u A_worker
case 2: thread returns both cubes and atoms
Only the returned cubes get split by the newly discovered atoms (A_worker). Existing cubes are not touched.
return_cubes C_batch A_batch C_worker A_worker:
C_batch <- C_batch u { cube * 2^A_worker | cube in C_worker }.
A_batch <- A_batch u A_worker
This means:
Only the returned cubes get split by the newly discovered atoms (A_worker).
Existing cubes are not touched.
------------------------------------------------------------------------------------------------------------------------------------------------------------
Hybrid: Between Frugal and Greedy: (generalizes the first case of empty cube returned by worker) -- don't focus on this approach
i.e. Expand only the returned cubes, but allow them to be split on both new and old atoms not already in them.
C_batch <- C_batch u { cube * 2^(A_worker u (A_batch \ atoms(cube)) | cube in C_worker }
A_batch <- A_batch u A_worker
------------------------------------------------------------------------------------------------------------------------------------------------------------
Final thought (do this!): use greedy strategy by a policy when C_batch, A_batch, A_worker are "small". -- want to do this. switch to frugal strategy after reaching size limit
*/
// currenly, the code just implements the greedy strategy
void parallel::batch_manager::return_cubes(ast_translation& l2g, vector<expr_ref_vector>const& C_worker, expr_ref_vector const& A_worker) {
auto atom_in_cube = [&](expr_ref_vector const& cube, expr* atom) {
return any_of(cube, [&](expr* e) { return e == atom || (m.is_not(e, e) && e == atom); });
};
auto add_split_atom = [&](expr* atom, unsigned start) {
unsigned stop = m_cubes.size();
for (unsigned i = start; i < stop; ++i) {
m_cubes.push_back(m_cubes[i]);
m_cubes.back().push_back(m.mk_not(atom));
m_cubes[i].push_back(atom);
}
};
std::scoped_lock lock(mux);
unsigned max_cubes = 1000;
bool greedy_mode = (m_cubes.size() <= max_cubes);
unsigned a_worker_start_idx = 0;
//
// --- Phase 1: Greedy split of *existing* cubes on new A_worker atoms (greedy) ---
//
if (greedy_mode) {
for (; a_worker_start_idx < A_worker.size(); ++a_worker_start_idx) {
expr_ref g_atom(l2g(A_worker[a_worker_start_idx]), l2g.to());
if (m_split_atoms.contains(g_atom))
continue;
m_split_atoms.push_back(g_atom);
add_split_atom(g_atom, 0); // split all *existing* cubes
if (m_cubes.size() > max_cubes) {
greedy_mode = false;
++a_worker_start_idx; // start frugal from here
break;
}
}
}
unsigned initial_m_cubes_size = m_cubes.size(); // where to start processing the worker cubes after splitting the EXISTING cubes on the new worker atoms
// --- Phase 2: Process worker cubes (greedy) ---
for (auto& c : C_worker) {
expr_ref_vector g_cube(l2g.to());
for (auto& atom : c)
g_cube.push_back(l2g(atom));
unsigned start = m_cubes.size(); // update start after adding each cube so we only process the current cube being added
m_cubes.push_back(g_cube);
if (greedy_mode) {
// Split new cube on all existing m_split_atoms not in it
for (auto g_atom : m_split_atoms) {
if (!atom_in_cube(g_cube, g_atom)) {
add_split_atom(g_atom, start);
if (m_cubes.size() > max_cubes) {
greedy_mode = false;
break;
}
}
}
}
}
// --- Phase 3: Frugal fallback: only process NEW worker cubes with NEW atoms ---
if (!greedy_mode) {
for (unsigned i = a_worker_start_idx; i < A_worker.size(); ++i) {
expr_ref g_atom(l2g(A_worker[i]), l2g.to());
if (!m_split_atoms.contains(g_atom))
m_split_atoms.push_back(g_atom);
add_split_atom(g_atom, initial_m_cubes_size);
}
}
}
expr_ref_vector parallel::worker::get_split_atoms() {
unsigned k = 2;
auto candidates = ctx->m_pq_scores.get_heap();
std::sort(candidates.begin(), candidates.end(),
[](const auto& a, const auto& b) { return a.priority > b.priority; });
expr_ref_vector top_lits(m);
for (const auto& node: candidates) {
if (ctx->get_assignment(node.key) != l_undef)
continue;
expr* e = ctx->bool_var2expr(node.key);
if (!e)
continue;
top_lits.push_back(expr_ref(e, m));
if (top_lits.size() >= k)
break;
}
IF_VERBOSE(1, verbose_stream() << "top literals " << top_lits << " head size " << ctx->m_pq_scores.size() << " num vars " << ctx->get_num_bool_vars() << "\n");
return top_lits;
}
void parallel::batch_manager::initialize() {
m_state = state::is_running;
m_cubes.reset();
m_cubes.push_back(expr_ref_vector(m)); // push empty cube
m_split_atoms.reset();
}
lbool parallel::operator()(expr_ref_vector const& asms) {
ast_manager& m = ctx.m;
lbool result = l_undef;
unsigned num_threads = std::min((unsigned) std::thread::hardware_concurrency(), ctx.get_fparams().m_threads);
flet<unsigned> _nt(ctx.m_fparams.m_threads, 1);
unsigned thread_max_conflicts = ctx.get_fparams().m_threads_max_conflicts;
unsigned max_conflicts = ctx.get_fparams().m_max_conflicts;
// try first sequential with a low conflict budget to make super easy problems cheap
unsigned max_c = std::min(thread_max_conflicts, 40u);
flet<unsigned> _mc(ctx.get_fparams().m_max_conflicts, max_c);
result = ctx.check(asms.size(), asms.data());
if (result != l_undef || ctx.m_num_conflicts < max_c) {
return result;
}
enum par_exception_kind {
DEFAULT_EX,
ERROR_EX
};
vector<smt_params> smt_params;
scoped_ptr_vector<ast_manager> pms;
scoped_ptr_vector<context> pctxs;
vector<expr_ref_vector> pasms;
ast_manager& m = ctx.m;
scoped_limits sl(m.limit());
unsigned finished_id = UINT_MAX;
std::string ex_msg;
par_exception_kind ex_kind = DEFAULT_EX;
unsigned error_code = 0;
bool done = false;
unsigned num_rounds = 0;
if (m.has_trace_stream())
throw default_exception("trace streams have to be off in parallel mode");
struct scoped_clear_table {
obj_hashtable<expr>& ht;
scoped_clear_table(obj_hashtable<expr>& ht) : ht(ht) {} // Constructor: Takes a reference to a hash table when the object is created and saves it.
~scoped_clear_table() { ht.reset(); } // Destructor: When the scoped_clear_table object goes out of scope, it automatically calls reset() on that hash table, clearing it
};
scoped_clear_table clear(m_assumptions_used); // creates a scoped_clear_table named clear, bound to m_assumptions_used
{
m_batch_manager.initialize();
m_workers.reset();
scoped_limits sl(m.limit());
flet<unsigned> _nt(ctx.m_fparams.m_threads, 1);
SASSERT(num_threads > 1);
for (unsigned i = 0; i < num_threads; ++i)
m_workers.push_back(alloc(worker, i, *this, asms)); // i.e. "new worker(i, *this, asms)"
// THIS WILL ALLOW YOU TO CANCEL ALL THE CHILD THREADS
// within the lexical scope of the code block, creates a data structure that allows you to push children
// objects to the limit object, so if someone cancels the parent object, the cancellation propagates to the children
// and that cancellation has the lifetime of the scope
// even if this code doesn't expliclty kill the main thread, still applies bc if you e.g. Ctrl+C the main thread, the children threads need to be cancelled
for (auto w : m_workers)
sl.push_child(&(w->limit()));
// Launch threads
vector<std::thread> threads(num_threads);
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = std::thread([&, i]() {
m_workers[i]->run();
});
}
// Wait for all threads to finish
for (auto& th : threads)
th.join();
for (auto w : m_workers)
w->collect_statistics(ctx.m_aux_stats);
params_ref params = ctx.get_params();
for (unsigned i = 0; i < num_threads; ++i) {
smt_params.push_back(ctx.get_fparams());
smt_params.back().m_preprocess = false;
}
for (unsigned i = 0; i < num_threads; ++i) {
ast_manager* new_m = alloc(ast_manager, m, true);
pms.push_back(new_m);
pctxs.push_back(alloc(context, *new_m, smt_params[i], params));
context& new_ctx = *pctxs.back();
context::copy(ctx, new_ctx, true);
new_ctx.set_random_seed(i + ctx.get_fparams().m_random_seed);
ast_translation tr(m, *new_m);
pasms.push_back(tr(asms));
sl.push_child(&(new_m->limit()));
}
m_workers.clear();
return m_batch_manager.get_result(); // i.e. all threads have finished all of their cubes -- so if state::is_running is still true, means the entire formula is unsat (otherwise a thread would have returned l_undef)
auto cube = [](context& ctx, expr_ref_vector& lasms, expr_ref& c) {
lookahead lh(ctx);
c = lh.choose();
if (c) {
if ((ctx.get_random_value() % 2) == 0)
c = c.get_manager().mk_not(c);
lasms.push_back(c);
}
};
obj_hashtable<expr> unit_set;
expr_ref_vector unit_trail(ctx.m);
unsigned_vector unit_lim;
for (unsigned i = 0; i < num_threads; ++i) unit_lim.push_back(0);
std::function<void(void)> collect_units = [&,this]() {
//return; -- has overhead
for (unsigned i = 0; i < num_threads; ++i) {
context& pctx = *pctxs[i];
pctx.pop_to_base_lvl();
ast_translation tr(pctx.m, ctx.m);
unsigned sz = pctx.assigned_literals().size();
for (unsigned j = unit_lim[i]; j < sz; ++j) {
literal lit = pctx.assigned_literals()[j];
//IF_VERBOSE(0, verbose_stream() << "(smt.thread " << i << " :unit " << lit << " " << pctx.is_relevant(lit.var()) << ")\n";);
if (!pctx.is_relevant(lit.var()))
continue;
expr_ref e(pctx.bool_var2expr(lit.var()), pctx.m);
if (lit.sign()) e = pctx.m.mk_not(e);
expr_ref ce(tr(e.get()), ctx.m);
if (!unit_set.contains(ce)) {
unit_set.insert(ce);
unit_trail.push_back(ce);
}
}
}
unsigned sz = unit_trail.size();
for (unsigned i = 0; i < num_threads; ++i) {
context& pctx = *pctxs[i];
ast_translation tr(ctx.m, pctx.m);
for (unsigned j = unit_lim[i]; j < sz; ++j) {
expr_ref src(ctx.m), dst(pctx.m);
dst = tr(unit_trail.get(j));
pctx.assert_expr(dst);
}
unit_lim[i] = pctx.assigned_literals().size();
}
IF_VERBOSE(1, verbose_stream() << "(smt.thread :units " << sz << ")\n");
};
std::mutex mux;
auto worker_thread = [&](int i) {
try {
context& pctx = *pctxs[i];
ast_manager& pm = *pms[i];
expr_ref_vector lasms(pasms[i]);
expr_ref c(pm);
pctx.get_fparams().m_max_conflicts = std::min(thread_max_conflicts, max_conflicts);
if (num_rounds > 0 && (num_rounds % pctx.get_fparams().m_threads_cube_frequency) == 0)
cube(pctx, lasms, c);
IF_VERBOSE(1, verbose_stream() << "(smt.thread " << i;
if (num_rounds > 0) verbose_stream() << " :round " << num_rounds;
if (c) verbose_stream() << " :cube " << mk_bounded_pp(c, pm, 3);
verbose_stream() << ")\n";);
lbool r = pctx.check(lasms.size(), lasms.data());
if (r == l_undef && pctx.m_num_conflicts >= max_conflicts)
; // no-op
else if (r == l_undef && pctx.m_num_conflicts >= thread_max_conflicts)
return;
else if (r == l_false && pctx.unsat_core().contains(c)) {
IF_VERBOSE(1, verbose_stream() << "(smt.thread " << i << " :learn " << mk_bounded_pp(c, pm, 3) << ")");
pctx.assert_expr(mk_not(mk_and(pctx.unsat_core())));
return;
}
bool first = false;
{
std::lock_guard<std::mutex> lock(mux);
if (finished_id == UINT_MAX) {
finished_id = i;
first = true;
result = r;
done = true;
}
if (!first && r != l_undef && result == l_undef) {
finished_id = i;
result = r;
}
else if (!first) return;
}
for (ast_manager* m : pms) {
if (m != &pm) m->limit().cancel();
}
}
catch (z3_error & err) {
if (finished_id == UINT_MAX) {
error_code = err.error_code();
ex_kind = ERROR_EX;
done = true;
}
}
catch (z3_exception & ex) {
if (finished_id == UINT_MAX) {
ex_msg = ex.what();
ex_kind = DEFAULT_EX;
done = true;
}
}
catch (...) {
if (finished_id == UINT_MAX) {
ex_msg = "unknown exception";
ex_kind = ERROR_EX;
done = true;
}
}
};
// for debugging: num_threads = 1;
while (true) {
vector<std::thread> threads(num_threads);
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = std::thread([&, i]() { worker_thread(i); });
}
for (auto & th : threads) {
th.join();
}
if (done) break;
collect_units();
++num_rounds;
max_conflicts = (max_conflicts < thread_max_conflicts) ? 0 : (max_conflicts - thread_max_conflicts);
thread_max_conflicts *= 2;
}
for (context* c : pctxs) {
c->collect_statistics(ctx.m_aux_stats);
}
if (finished_id == UINT_MAX) {
switch (ex_kind) {
case ERROR_EX: throw z3_error(error_code);
default: throw default_exception(std::move(ex_msg));
}
}
model_ref mdl;
context& pctx = *pctxs[finished_id];
ast_translation tr(*pms[finished_id], m);
switch (result) {
case l_true:
pctx.get_model(mdl);
if (mdl)
ctx.set_model(mdl->translate(tr));
break;
case l_false:
ctx.m_unsat_core.reset();
for (expr* e : pctx.unsat_core())
ctx.m_unsat_core.push_back(tr(e));
break;
default:
break;
}
return result;
}
}

View file

@ -19,124 +19,16 @@ Revision History:
#pragma once
#include "smt/smt_context.h"
#include <thread>
namespace smt {
class parallel {
context& ctx;
unsigned num_threads;
struct shared_clause {
unsigned source_worker_id;
expr_ref clause;
};
class batch_manager {
enum state {
is_running,
is_sat,
is_unsat,
is_exception_msg,
is_exception_code
};
ast_manager& m;
parallel& p;
std::mutex mux;
state m_state = state::is_running;
expr_ref_vector m_split_atoms; // atoms to split on
vector<expr_ref_vector> m_cubes;
unsigned m_max_batch_size = 10;
unsigned m_exception_code = 0;
std::string m_exception_msg;
vector<shared_clause> shared_clause_trail; // store all shared clauses with worker IDs
obj_hashtable<expr> shared_clause_set; // for duplicate filtering on per-thread clause expressions
// called from batch manager to cancel other workers if we've reached a verdict
void cancel_workers() {
IF_VERBOSE(1, verbose_stream() << "Canceling workers\n");
for (auto& w : p.m_workers)
w->cancel();
}
public:
batch_manager(ast_manager& m, parallel& p) : m(m), p(p), m_split_atoms(m) { }
void initialize();
void set_unsat(ast_translation& l2g, expr_ref_vector const& unsat_core);
void set_sat(ast_translation& l2g, model& m);
void set_exception(std::string const& msg);
void set_exception(unsigned error_code);
//
// worker threads ask the batch manager for a supply of cubes to check.
// they pass in a translation function from the global context to local context (ast-manager). It is called g2l.
// The batch manager returns a list of cubes to solve.
//
void get_cubes(ast_translation& g2l, vector<expr_ref_vector>& cubes);
//
// worker threads return unprocessed cubes to the batch manager together with split literal candidates.
// the batch manager re-enqueues unprocessed cubes and optionally splits them using the split_atoms returned by this and workers.
//
void return_cubes(ast_translation& l2g, vector<expr_ref_vector>const& cubes, expr_ref_vector const& split_atoms);
void report_assumption_used(ast_translation& l2g, expr* assumption);
void collect_clause(ast_translation& l2g, unsigned source_worker_id, expr* e);
expr_ref_vector return_shared_clauses(ast_translation& g2l, unsigned& worker_limit, unsigned worker_id);
lbool get_result() const;
};
class worker {
unsigned id; // unique identifier for the worker
parallel& p;
batch_manager& b;
ast_manager m;
expr_ref_vector asms;
smt_params m_smt_params;
scoped_ptr<context> ctx;
unsigned m_max_conflicts = 800; // the global budget for all work this worker can do across cubes in the current run.
unsigned m_max_thread_conflicts = 100; // the per-cube limit for how many conflicts the worker can spend on a single cube before timing out on it and moving on
unsigned m_num_shared_units = 0;
unsigned m_shared_clause_limit = 0; // remembers the index into shared_clause_trail marking the boundary between "old" and "new" clauses to share
void share_units(ast_translation& l2g);
lbool check_cube(expr_ref_vector const& cube);
void update_max_thread_conflicts() {
m_max_thread_conflicts *= 2;
} // allow for backoff scheme of conflicts within the thread for cube timeouts.
public:
worker(unsigned id, parallel& p, expr_ref_vector const& _asms);
void run();
expr_ref_vector get_split_atoms();
void collect_shared_clauses(ast_translation& g2l);
void cancel() {
IF_VERBOSE(1, verbose_stream() << "Worker " << id << " canceling\n");
m.limit().cancel();
}
void collect_statistics(::statistics& st) const {
IF_VERBOSE(1, verbose_stream() << "Collecting statistics for worker " << id << "\n");
ctx->collect_statistics(st);
}
reslimit& limit() {
return m.limit();
}
};
obj_hashtable<expr> m_assumptions_used; // assumptions used in unsat cores, to be used in final core
batch_manager m_batch_manager;
ptr_vector<worker> m_workers;
public:
parallel(context& ctx) :
ctx(ctx),
num_threads(std::min(
(unsigned)std::thread::hardware_concurrency(),
ctx.get_fparams().m_threads)),
m_batch_manager(ctx.m, *this) {}
parallel(context& ctx): ctx(ctx) {}
lbool operator()(expr_ref_vector const& asms);
};
}