mirror of
				https://github.com/Z3Prover/z3
				synced 2025-11-04 13:29:11 +00:00 
			
		
		
		
	Implement mini_quip
This commit is contained in:
		
							parent
							
								
									83e3a79bd1
								
							
						
					
					
						commit
						19471f9fa3
					
				
					 2 changed files with 786 additions and 72 deletions
				
			
		| 
						 | 
				
			
			@ -395,75 +395,3 @@ test("data/horn3.smt2")
 | 
			
		|||
test("data/horn4.smt2")
 | 
			
		||||
test("data/horn5.smt2")
 | 
			
		||||
# test("data/horn6.smt2") # takes long time to finish
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
# TBD: Quip variant of IC3
 | 
			
		||||
 | 
			
		||||
must = True
 | 
			
		||||
may = False
 | 
			
		||||
 | 
			
		||||
class QGoal:
 | 
			
		||||
    def __init__(self, cube, parent, level, must):
 | 
			
		||||
        self.level = level
 | 
			
		||||
        self.cube = cube
 | 
			
		||||
        self.parent = parent
 | 
			
		||||
        self.must = must
 | 
			
		||||
 | 
			
		||||
class Quip(MiniIC3):
 | 
			
		||||
 | 
			
		||||
    # prev & tras -> r', such that r' intersects with cube
 | 
			
		||||
    def add_reachable(self, prev, cube):
 | 
			
		||||
        s = fd_solver()
 | 
			
		||||
        s.add(self.trans)
 | 
			
		||||
        s.add(prev)
 | 
			
		||||
        s.add(Or(cube))
 | 
			
		||||
        is_sat = s.check()
 | 
			
		||||
        assert is_sat == sat
 | 
			
		||||
        m = s.model();
 | 
			
		||||
        result = self.values2literals(m, cube)
 | 
			
		||||
        assert result
 | 
			
		||||
        self.reachable.add(result)
 | 
			
		||||
 | 
			
		||||
    # A state s0 and level f0 such that
 | 
			
		||||
    # not(s0) is f0-1 inductive
 | 
			
		||||
    def quip_blocked(self, s0, f0):
 | 
			
		||||
        self.push_heap(QGoal(self.next(s0), None, f0, must))
 | 
			
		||||
        while self.goals:
 | 
			
		||||
           f, g = heapq.heappop(self.goals)
 | 
			
		||||
           sys.stdout.write("%d." % f)
 | 
			
		||||
           sys.stdout.flush()
 | 
			
		||||
           if f == 0:
 | 
			
		||||
              if g.must:
 | 
			
		||||
                 print("")
 | 
			
		||||
                 return g
 | 
			
		||||
              self.add_reachable(self.init, p.parent.cube)
 | 
			
		||||
              continue
 | 
			
		||||
 | 
			
		||||
        # TBD
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
                        
 | 
			
		||||
    def run(self):
 | 
			
		||||
        if not check_disjoint(self.init, self.bad):
 | 
			
		||||
           return "goal is reached in initial state"
 | 
			
		||||
        level = 0
 | 
			
		||||
        while True:
 | 
			
		||||
            inv = self.is_valid()
 | 
			
		||||
            if inv is not None:
 | 
			
		||||
                return inv
 | 
			
		||||
            is_sat, cube = self.unfold()
 | 
			
		||||
            if is_sat == unsat:
 | 
			
		||||
               level += 1
 | 
			
		||||
               print("Unfold %d" % level)
 | 
			
		||||
               sys.stdout.flush()
 | 
			
		||||
               self.add_solver()
 | 
			
		||||
            elif is_sat == sat:
 | 
			
		||||
               cex = self.quipie_blocked(cube, level)
 | 
			
		||||
               if cex is not None:
 | 
			
		||||
                  return cex
 | 
			
		||||
            else:
 | 
			
		||||
               return is_sat  
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										786
									
								
								examples/python/mini_quip.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										786
									
								
								examples/python/mini_quip.py
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,786 @@
 | 
			
		|||
from z3 import *
 | 
			
		||||
import heapq
 | 
			
		||||
import numpy
 | 
			
		||||
import time
 | 
			
		||||
import random
 | 
			
		||||
 | 
			
		||||
verbose = True
 | 
			
		||||
 | 
			
		||||
# Simplistic (and fragile) converter from
 | 
			
		||||
# a class of Horn clauses corresponding to
 | 
			
		||||
# a transition system into a transition system
 | 
			
		||||
# representation as <init, trans, goal>
 | 
			
		||||
# It assumes it is given three Horn clauses
 | 
			
		||||
# of the form:
 | 
			
		||||
#  init(x) => Invariant(x)
 | 
			
		||||
#  Invariant(x) and trans(x,x') => Invariant(x')
 | 
			
		||||
#  Invariant(x) and goal(x) => Goal(x)
 | 
			
		||||
# where Invariant and Goal are uninterpreted predicates
 | 
			
		||||
 | 
			
		||||
class Horn2Transitions:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.trans = True
 | 
			
		||||
        self.init = True
 | 
			
		||||
        self.inputs = []
 | 
			
		||||
        self.goal = True
 | 
			
		||||
        self.index = 0
 | 
			
		||||
 | 
			
		||||
    def parse(self, file):
 | 
			
		||||
        fp = Fixedpoint()
 | 
			
		||||
        goals = fp.parse_file(file)
 | 
			
		||||
        for r in fp.get_rules():
 | 
			
		||||
            if not is_quantifier(r):
 | 
			
		||||
                continue
 | 
			
		||||
            b = r.body()
 | 
			
		||||
            if not is_implies(b):
 | 
			
		||||
                continue
 | 
			
		||||
            f = b.arg(0)
 | 
			
		||||
            g = b.arg(1)
 | 
			
		||||
            if self.is_goal(f, g):
 | 
			
		||||
                continue
 | 
			
		||||
            if self.is_transition(f, g):
 | 
			
		||||
                continue
 | 
			
		||||
            if self.is_init(f, g):
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
    def is_pred(self, p, name):
 | 
			
		||||
        return is_app(p) and p.decl().name() == name
 | 
			
		||||
 | 
			
		||||
    def is_goal(self, body, head):
 | 
			
		||||
        if not self.is_pred(head, "Goal"):
 | 
			
		||||
            return False
 | 
			
		||||
        pred, inv = self.is_body(body)
 | 
			
		||||
        if pred is None:
 | 
			
		||||
            return False
 | 
			
		||||
        self.goal = self.subst_vars("x", inv, pred)
 | 
			
		||||
        self.goal = self.subst_vars("i", self.goal, self.goal)
 | 
			
		||||
        self.inputs += self.vars
 | 
			
		||||
        self.inputs = list(set(self.inputs))
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def is_body(self, body):
 | 
			
		||||
        if not is_and(body):
 | 
			
		||||
            return None, None
 | 
			
		||||
        fmls = [f for f in body.children() if self.is_inv(f) is None]
 | 
			
		||||
        inv = None
 | 
			
		||||
        for f in body.children():
 | 
			
		||||
            if self.is_inv(f) is not None:
 | 
			
		||||
                inv = f;
 | 
			
		||||
                break
 | 
			
		||||
        return And(fmls), inv
 | 
			
		||||
 | 
			
		||||
    def is_inv(self, f):
 | 
			
		||||
        if self.is_pred(f, "Invariant"):
 | 
			
		||||
            return f
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    def is_transition(self, body, head):
 | 
			
		||||
        pred, inv0 = self.is_body(body)
 | 
			
		||||
        if pred is None:
 | 
			
		||||
            return False
 | 
			
		||||
        inv1 = self.is_inv(head)
 | 
			
		||||
        if inv1 is None:
 | 
			
		||||
            return False
 | 
			
		||||
        pred = self.subst_vars("x",  inv0, pred)
 | 
			
		||||
        self.xs = self.vars
 | 
			
		||||
        pred = self.subst_vars("xn", inv1, pred)
 | 
			
		||||
        self.xns = self.vars
 | 
			
		||||
        pred = self.subst_vars("i", pred, pred)
 | 
			
		||||
        self.inputs += self.vars
 | 
			
		||||
        self.inputs = list(set(self.inputs))
 | 
			
		||||
        self.trans = pred
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def is_init(self, body, head):
 | 
			
		||||
        for f in body.children():
 | 
			
		||||
            if self.is_inv(f) is not None:
 | 
			
		||||
               return False
 | 
			
		||||
        inv = self.is_inv(head)
 | 
			
		||||
        if inv is None:
 | 
			
		||||
            return False
 | 
			
		||||
        self.init = self.subst_vars("x", inv, body)
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def subst_vars(self, prefix, inv, fml):
 | 
			
		||||
        subst = self.mk_subst(prefix, inv)
 | 
			
		||||
        self.vars = [ v for (k,v) in subst ]
 | 
			
		||||
        return substitute(fml, subst)
 | 
			
		||||
 | 
			
		||||
    def mk_subst(self, prefix, inv):
 | 
			
		||||
        self.index = 0
 | 
			
		||||
        if self.is_inv(inv) is not None:
 | 
			
		||||
            return [(f, self.mk_bool(prefix)) for f in inv.children()]
 | 
			
		||||
        else:
 | 
			
		||||
            vars = self.get_vars(inv)
 | 
			
		||||
            return [(f, self.mk_bool(prefix)) for f in vars]
 | 
			
		||||
 | 
			
		||||
    def mk_bool(self, prefix):
 | 
			
		||||
        self.index += 1
 | 
			
		||||
        return Bool("%s%d" % (prefix, self.index))
 | 
			
		||||
 | 
			
		||||
    def get_vars(self, f, rs=[]):
 | 
			
		||||
        if is_var(f):
 | 
			
		||||
            return z3util.vset(rs + [f], str)
 | 
			
		||||
        else:
 | 
			
		||||
            for f_ in f.children():
 | 
			
		||||
                rs = self.get_vars(f_, rs)
 | 
			
		||||
            return z3util.vset(rs, str)
 | 
			
		||||
 | 
			
		||||
# Produce a finite domain solver.
 | 
			
		||||
# The theory QF_FD covers bit-vector formulas
 | 
			
		||||
# and pseudo-Boolean constraints.
 | 
			
		||||
# By default cardinality and pseudo-Boolean
 | 
			
		||||
# constraints are converted to clauses. To override
 | 
			
		||||
# this default for cardinality constraints
 | 
			
		||||
# we set sat.cardinality.solver to True
 | 
			
		||||
 | 
			
		||||
def fd_solver():
 | 
			
		||||
    s = SolverFor("QF_FD")
 | 
			
		||||
    s.set("sat.cardinality.solver", True)
 | 
			
		||||
    return s
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# negate, avoid double negation
 | 
			
		||||
def negate(f):
 | 
			
		||||
    if is_not(f):
 | 
			
		||||
        return f.arg(0)
 | 
			
		||||
    else:
 | 
			
		||||
        return Not(f)
 | 
			
		||||
 | 
			
		||||
def cube2clause(cube):
 | 
			
		||||
    return Or([negate(f) for f in cube])
 | 
			
		||||
 | 
			
		||||
class State:
 | 
			
		||||
    def __init__(self, s):
 | 
			
		||||
        self.R = set([])
 | 
			
		||||
        self.solver = s
 | 
			
		||||
 | 
			
		||||
    def add(self, clause):
 | 
			
		||||
        if clause not in self.R:
 | 
			
		||||
           self.R |= { clause }
 | 
			
		||||
           self.solver.add(clause)
 | 
			
		||||
 | 
			
		||||
def is_seq(f):
 | 
			
		||||
    return isinstance(f, list) or isinstance(f, tuple) or isinstance(f, AstVector)
 | 
			
		||||
 | 
			
		||||
# Check if the initial state is bad
 | 
			
		||||
def check_disjoint(a, b):
 | 
			
		||||
    s = fd_solver()
 | 
			
		||||
    s.add(a)
 | 
			
		||||
    s.add(b)
 | 
			
		||||
    return unsat == s.check()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Remove clauses that are subsumed
 | 
			
		||||
def prune(R):
 | 
			
		||||
    removed = set([])
 | 
			
		||||
    s = fd_solver()
 | 
			
		||||
    for f1 in R:
 | 
			
		||||
        s.push()
 | 
			
		||||
        for f2 in R:
 | 
			
		||||
            if f2 not in removed:
 | 
			
		||||
               s.add(Not(f2) if f1.eq(f2) else f2)
 | 
			
		||||
        if s.check() == unsat:
 | 
			
		||||
            removed |= { f1 }
 | 
			
		||||
        s.pop()
 | 
			
		||||
    return R - removed
 | 
			
		||||
 | 
			
		||||
# Quip variant of IC3
 | 
			
		||||
 | 
			
		||||
must = True
 | 
			
		||||
may = False
 | 
			
		||||
 | 
			
		||||
class QLemma:
 | 
			
		||||
    def __init__(self, c):
 | 
			
		||||
        self.cube = c
 | 
			
		||||
        self.clause = cube2clause(c)
 | 
			
		||||
        self.bad = False
 | 
			
		||||
 | 
			
		||||
    def __hash__(self):
 | 
			
		||||
        return hash(tuple(set(self.cube)))
 | 
			
		||||
 | 
			
		||||
    def __eq__(self, qlemma2):
 | 
			
		||||
        if set(self.cube) == set(qlemma2.cube) and self.bad == qlemma2.bad:
 | 
			
		||||
            return True
 | 
			
		||||
        else:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
    def __ne__():
 | 
			
		||||
        if not self.__eq__(self, qlemma2):
 | 
			
		||||
            return True
 | 
			
		||||
        else:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class QGoal:
 | 
			
		||||
    def __init__(self, cube, parent, level, must, encounter):
 | 
			
		||||
        self.level = level
 | 
			
		||||
        self.cube = cube
 | 
			
		||||
        self.parent = parent
 | 
			
		||||
        self.must = must
 | 
			
		||||
 | 
			
		||||
    def __lt__(self, other):
 | 
			
		||||
        return self.level < other.level
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class QReach:
 | 
			
		||||
 | 
			
		||||
    # it is assumed that there is a single initial state
 | 
			
		||||
    # with all latches set to 0 in hardware design, so
 | 
			
		||||
    # here init will always give a state where all variable are set to 0
 | 
			
		||||
    def __init__(self, init, xs):
 | 
			
		||||
        self.xs = xs
 | 
			
		||||
        self.constant_xs = [Not(x) for x in self.xs]
 | 
			
		||||
        s = fd_solver()
 | 
			
		||||
        s.add(init)
 | 
			
		||||
        is_sat = s.check()
 | 
			
		||||
        assert is_sat == sat
 | 
			
		||||
        m = s.model()
 | 
			
		||||
        # xs is a list, "for" will keep the order when iterating
 | 
			
		||||
        self.states = numpy.array([[False for x in self.xs]])  # all set to False
 | 
			
		||||
        assert not numpy.max(self.states)  # since all element is False, so maximum should be False
 | 
			
		||||
 | 
			
		||||
    # check if new state exists
 | 
			
		||||
    def is_exist(self, state):
 | 
			
		||||
        if state in self.states:
 | 
			
		||||
            return True
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    def enumerate(self, i, state_b, state):
 | 
			
		||||
        while i < len(state) and state[i] not in self.xs:
 | 
			
		||||
            i += 1
 | 
			
		||||
        if i >= len(state):
 | 
			
		||||
            if state_b.tolist() not in self.states.tolist():
 | 
			
		||||
                self.states = numpy.append(self.states, [state_b], axis = 0)
 | 
			
		||||
                return state_b
 | 
			
		||||
            else:
 | 
			
		||||
                return None
 | 
			
		||||
        state_b[i] = False
 | 
			
		||||
        if self.enumerate(i+1, state_b, state) is not None:
 | 
			
		||||
            return state_b
 | 
			
		||||
        else:
 | 
			
		||||
            state_b[i] = True
 | 
			
		||||
            return self.enumerate(i+1, state_b, state)
 | 
			
		||||
 | 
			
		||||
    def is_full_state(self, state):
 | 
			
		||||
        for i in range(len(self.xs)):
 | 
			
		||||
            if state[i] in self.xs:
 | 
			
		||||
                return False
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def add(self, cube):
 | 
			
		||||
        state = self.cube2partial_state(cube)
 | 
			
		||||
        assert len(state) == len(self.xs)
 | 
			
		||||
        if not self.is_exist(state):
 | 
			
		||||
            return None
 | 
			
		||||
        if self.is_full_state(state):
 | 
			
		||||
            self.states = numpy.append(self.states, [state], axis = 0)
 | 
			
		||||
        else:
 | 
			
		||||
            # state[i] is instance, state_b[i] is boolean
 | 
			
		||||
            state_b = numpy.array(state)
 | 
			
		||||
            for i in range(len(state)):  # state is of same length as self.xs
 | 
			
		||||
                # i-th literal in state hasn't been assigned value
 | 
			
		||||
                # init un-assigned literals in state_b as True
 | 
			
		||||
                # make state_b only contain boolean value
 | 
			
		||||
                if state[i] in self.xs:
 | 
			
		||||
                    state_b[i] = True
 | 
			
		||||
                else:
 | 
			
		||||
                    state_b[i] = is_true(state[i])
 | 
			
		||||
            if self.enumerate(0, state_b, state) is not None:
 | 
			
		||||
                lits_to_remove = set([negate(f) for f in list(set(cube) - set(self.constant_xs))])
 | 
			
		||||
                self.constant_xs = list(set(self.constant_xs) - lits_to_remove)
 | 
			
		||||
                return state
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def cube2partial_state(self, cube):
 | 
			
		||||
        s = fd_solver()
 | 
			
		||||
        s.add(And(cube))
 | 
			
		||||
        is_sat = s.check()
 | 
			
		||||
        assert is_sat == sat
 | 
			
		||||
        m = s.model()
 | 
			
		||||
        state = numpy.array([m.eval(x) for x in self.xs])
 | 
			
		||||
        return state
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def state2cube(self, s):
 | 
			
		||||
        result = copy.deepcopy(self.xs)  # x1, x2, ...
 | 
			
		||||
        for i in range(len(self.xs)):
 | 
			
		||||
            if not s[i]:
 | 
			
		||||
                result[i] = Not(result[i])
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
    def intersect(self, cube):
 | 
			
		||||
        state = self.cube2partial_state(cube)
 | 
			
		||||
        mask = True
 | 
			
		||||
        for i in range(len(self.xs)):
 | 
			
		||||
            if is_true(state[i]) or is_false(state[i]):
 | 
			
		||||
                mask = (self.states[:, i] == state[i]) & mask
 | 
			
		||||
        intersects = numpy.reshape(self.states[mask], (-1, len(self.xs)))
 | 
			
		||||
        if intersects.size > 0:
 | 
			
		||||
            return And(self.state2cube(intersects[0]))  # only need to return one single intersect
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Quip:
 | 
			
		||||
 | 
			
		||||
    def __init__(self, init, trans, goal, x0, inputs, xn):
 | 
			
		||||
        self.x0 = x0
 | 
			
		||||
        self.inputs = inputs
 | 
			
		||||
        self.xn = xn
 | 
			
		||||
        self.init = init
 | 
			
		||||
        self.bad = goal
 | 
			
		||||
        self.trans = trans
 | 
			
		||||
        self.min_cube_solver = fd_solver()
 | 
			
		||||
        self.min_cube_solver.add(Not(trans))
 | 
			
		||||
        self.goals = []
 | 
			
		||||
        s = State(fd_solver())
 | 
			
		||||
        s.add(init)
 | 
			
		||||
        s.solver.add(trans)  # check if a bad state can be reached in one step from current level
 | 
			
		||||
        self.states = [s]
 | 
			
		||||
        self.s_bad = fd_solver()
 | 
			
		||||
        self.s_good = fd_solver()
 | 
			
		||||
        self.s_bad.add(self.bad)
 | 
			
		||||
        self.s_good.add(Not(self.bad))
 | 
			
		||||
        self.reachable = QReach(self.init, x0)
 | 
			
		||||
        self.frames = []  # frames is a 2d list, each row (representing level) is a set containing several (clause, bad) pairs
 | 
			
		||||
        self.count_may = 0
 | 
			
		||||
 | 
			
		||||
    def next(self, f):
 | 
			
		||||
        if is_seq(f):
 | 
			
		||||
           return [self.next(f1) for f1 in f]
 | 
			
		||||
        return substitute(f, zip(self.x0, self.xn))
 | 
			
		||||
 | 
			
		||||
    def prev(self, f):
 | 
			
		||||
        if is_seq(f):
 | 
			
		||||
           return [self.prev(f1) for f1 in f]
 | 
			
		||||
        return substitute(f, zip(self.xn, self.x0))
 | 
			
		||||
 | 
			
		||||
    def add_solver(self):
 | 
			
		||||
        s = fd_solver()
 | 
			
		||||
        s.add(self.trans)
 | 
			
		||||
        self.states += [State(s)]
 | 
			
		||||
 | 
			
		||||
    def R(self, i):
 | 
			
		||||
        return And(self.states[i].R)
 | 
			
		||||
 | 
			
		||||
    def value2literal(self, m, x):
 | 
			
		||||
        value = m.eval(x)
 | 
			
		||||
        if is_true(value):
 | 
			
		||||
            return x
 | 
			
		||||
        if is_false(value):
 | 
			
		||||
            return Not(x)
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    def values2literals(self, m, xs):
 | 
			
		||||
        p = [self.value2literal(m, x) for x in xs]
 | 
			
		||||
        return [x for x in p if x is not None]
 | 
			
		||||
 | 
			
		||||
    def project0(self, m):
 | 
			
		||||
        return self.values2literals(m, self.x0)
 | 
			
		||||
 | 
			
		||||
    def projectI(self, m):
 | 
			
		||||
        return self.values2literals(m, self.inputs)
 | 
			
		||||
 | 
			
		||||
    def projectN(self, m):
 | 
			
		||||
        return self.values2literals(m, self.xn)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    # Block a cube by asserting the clause corresponding to its negation
 | 
			
		||||
    def block_cube(self, i, cube):
 | 
			
		||||
        self.assert_clause(i, cube2clause(cube))
 | 
			
		||||
 | 
			
		||||
    # Add a clause to levels 1 until i
 | 
			
		||||
    def assert_clause(self, i, clause):
 | 
			
		||||
        for j in range(1, i + 1):
 | 
			
		||||
            self.states[j].add(clause)
 | 
			
		||||
            assert str(self.states[j].solver) != str([False])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    # minimize cube that is core of Dual solver.
 | 
			
		||||
    # this assumes that props & cube => Trans
 | 
			
		||||
    # which means props & cube can only give us a Tr in Trans,
 | 
			
		||||
    # and it will never make !Trans sat
 | 
			
		||||
    def minimize_cube(self, cube, inputs, lits):
 | 
			
		||||
        # min_cube_solver has !Trans (min_cube.solver.add(!Trans))
 | 
			
		||||
        is_sat = self.min_cube_solver.check(lits + [c for c in cube] + [i for i in inputs])
 | 
			
		||||
        assert is_sat == unsat
 | 
			
		||||
        # unsat_core gives us some lits which make Tr sat,
 | 
			
		||||
        # so that we can ignore other lits and include more states
 | 
			
		||||
        core = self.min_cube_solver.unsat_core()
 | 
			
		||||
        assert core
 | 
			
		||||
        return [c for c in core if c in set(cube)]
 | 
			
		||||
 | 
			
		||||
    # push a goal on a heap
 | 
			
		||||
    def push_heap(self, goal):
 | 
			
		||||
        heapq.heappush(self.goals, (goal.level, goal))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    # make sure cube to be blocked excludes all reachable states
 | 
			
		||||
    def check_reachable(self, cube):
 | 
			
		||||
        s = fd_solver()
 | 
			
		||||
        for state in self.reachable.states:
 | 
			
		||||
            s.push()
 | 
			
		||||
            r = self.reachable.state2cube(state)
 | 
			
		||||
            s.add(And(self.prev(r)))
 | 
			
		||||
            s.add(self.prev(cube))
 | 
			
		||||
            is_sat = s.check()
 | 
			
		||||
            s.pop()
 | 
			
		||||
            if is_sat == sat:
 | 
			
		||||
                # if sat, it means the cube to be blocked contains reachable states
 | 
			
		||||
                # so it is an invalid cube
 | 
			
		||||
                return False
 | 
			
		||||
        # if all fail, is_sat will be unsat
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    # Rudimentary generalization:
 | 
			
		||||
    # If the cube is already unsat with respect to transition relation
 | 
			
		||||
    # extract a core (not necessarily minimal)
 | 
			
		||||
    # otherwise, just return the cube.
 | 
			
		||||
    def generalize(self, cube, f):
 | 
			
		||||
        s = self.states[f - 1].solver
 | 
			
		||||
        if unsat == s.check(cube):
 | 
			
		||||
            core = s.unsat_core()
 | 
			
		||||
            if self.check_reachable(core):
 | 
			
		||||
                return core, f
 | 
			
		||||
        return cube, f
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def valid_reachable(self, level):
 | 
			
		||||
        s = fd_solver()
 | 
			
		||||
        s.add(self.init)
 | 
			
		||||
        for i in range(level):
 | 
			
		||||
            s.add(self.trans)
 | 
			
		||||
        for state in self.reachable.states:
 | 
			
		||||
            s.push()
 | 
			
		||||
            s.add(And(self.next(self.reachable.state2cube(state))))
 | 
			
		||||
            print self.reachable.state2cube(state)
 | 
			
		||||
            print s.check()
 | 
			
		||||
            s.pop()
 | 
			
		||||
 | 
			
		||||
    def lemmas(self, level):
 | 
			
		||||
        return [(l.clause, l.bad) for l in self.frames[level]]
 | 
			
		||||
 | 
			
		||||
    # whenever a new reachable state is found, we use it to mark some existing lemmas as bad lemmas
 | 
			
		||||
    def mark_bad_lemmas(self, new):
 | 
			
		||||
        s = fd_solver()
 | 
			
		||||
        reset = False
 | 
			
		||||
        for frame in self.frames:
 | 
			
		||||
            for lemma in frame:
 | 
			
		||||
                s.push()
 | 
			
		||||
                s.add(lemma.clause)
 | 
			
		||||
                is_sat = s.check(new)
 | 
			
		||||
                if is_sat == unsat:
 | 
			
		||||
                    reset = True
 | 
			
		||||
                    lemma.bad = True
 | 
			
		||||
                s.pop()
 | 
			
		||||
        if reset:
 | 
			
		||||
            self.states = [self.states[0]]
 | 
			
		||||
            for i in range(1, len(self.frames)):
 | 
			
		||||
                self.add_solver()
 | 
			
		||||
                for lemma in self.frames[i]:
 | 
			
		||||
                    if not lemma.bad:
 | 
			
		||||
                        self.states[i].add(lemma.clause)
 | 
			
		||||
 | 
			
		||||
    # prev & tras -> r', such that r' intersects with cube
 | 
			
		||||
    def add_reachable(self, prev, cube):
 | 
			
		||||
        s = fd_solver()
 | 
			
		||||
        s.add(self.trans)
 | 
			
		||||
        s.add(prev)
 | 
			
		||||
        s.add(self.next(And(cube)))
 | 
			
		||||
        is_sat = s.check()
 | 
			
		||||
        assert is_sat == sat
 | 
			
		||||
        m = s.model()
 | 
			
		||||
        new = self.projectN(m)
 | 
			
		||||
        state = self.reachable.add(self.prev(new))  # always add as non-primed
 | 
			
		||||
        if state is not None:  # if self.states do not have new state yet
 | 
			
		||||
            self.mark_bad_lemmas(self.prev(new))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    # Check if the negation of cube is inductive at level f
 | 
			
		||||
    def is_inductive(self, f, cube):
 | 
			
		||||
        s = self.states[f - 1].solver
 | 
			
		||||
        s.push()
 | 
			
		||||
        s.add(self.prev(Not(And(cube))))
 | 
			
		||||
        is_sat = s.check(cube)
 | 
			
		||||
        if is_sat == sat:
 | 
			
		||||
            m = s.model()
 | 
			
		||||
        s.pop()
 | 
			
		||||
        if is_sat == sat:
 | 
			
		||||
            cube = self.next(self.minimize_cube(self.project0(m), self.projectI(m), self.projectN(m)))
 | 
			
		||||
        elif is_sat == unsat:
 | 
			
		||||
            cube, f = self.generalize(cube, f)
 | 
			
		||||
            cube = self.next(cube)
 | 
			
		||||
        return cube, f, is_sat
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    # Determine if there is a cube for the current state
 | 
			
		||||
    # that is potentially reachable.
 | 
			
		||||
    def unfold(self, level):
 | 
			
		||||
        core = []
 | 
			
		||||
        self.s_bad.push()
 | 
			
		||||
        R = self.R(level)
 | 
			
		||||
        self.s_bad.add(R)  # check if current frame intersects with bad states, no trans
 | 
			
		||||
        is_sat = self.s_bad.check()
 | 
			
		||||
        if is_sat == sat:
 | 
			
		||||
           m = self.s_bad.model()
 | 
			
		||||
           cube = self.project0(m)
 | 
			
		||||
           props = cube + self.projectI(m)
 | 
			
		||||
           self.s_good.push()
 | 
			
		||||
           self.s_good.add(R)
 | 
			
		||||
           is_sat2 = self.s_good.check(props)
 | 
			
		||||
           assert is_sat2 == unsat
 | 
			
		||||
           core = self.s_good.unsat_core()
 | 
			
		||||
           assert core
 | 
			
		||||
           core = [c for c in core if c in set(cube)]
 | 
			
		||||
           self.s_good.pop()
 | 
			
		||||
        self.s_bad.pop()
 | 
			
		||||
        return is_sat, core
 | 
			
		||||
 | 
			
		||||
    # A state s0 and level f0 such that
 | 
			
		||||
    # not(s0) is f0-1 inductive
 | 
			
		||||
    def quip_blocked(self, s0, f0):
 | 
			
		||||
        self.push_heap(QGoal(self.next(s0), None, f0, must, 0))
 | 
			
		||||
        while self.goals:
 | 
			
		||||
            f, g = heapq.heappop(self.goals)
 | 
			
		||||
            sys.stdout.write("%d." % f)
 | 
			
		||||
            if not g.must:
 | 
			
		||||
                self.count_may -= 1
 | 
			
		||||
            sys.stdout.flush()
 | 
			
		||||
            if f == 0:
 | 
			
		||||
                if g.must:
 | 
			
		||||
                    s = fd_solver()
 | 
			
		||||
                    s.add(self.init)
 | 
			
		||||
                    s.add(self.prev(g.cube))
 | 
			
		||||
                    # since init is a complete assignment, so g.cube must equal to init in sat solver
 | 
			
		||||
                    assert is_sat == s.check()
 | 
			
		||||
                    if verbose:
 | 
			
		||||
                        print("")
 | 
			
		||||
                    return g
 | 
			
		||||
                self.add_reachable(self.init, g.parent.cube)
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            r0 = self.reachable.intersect(self.prev(g.cube))
 | 
			
		||||
            if r0 is not None:
 | 
			
		||||
                if g.must:
 | 
			
		||||
                    if verbose:
 | 
			
		||||
                        print ""
 | 
			
		||||
                    s = fd_solver()
 | 
			
		||||
                    s.add(self.trans)
 | 
			
		||||
                    # make it as a concrete reachable state
 | 
			
		||||
                    # intersect returns an And(...), so use children to get cube list
 | 
			
		||||
                    g.cube = r0.children()
 | 
			
		||||
                    while True:
 | 
			
		||||
                        is_sat = s.check(self.next(g.cube))
 | 
			
		||||
                        assert is_sat == sat
 | 
			
		||||
                        r = self.next(self.project0(s.model()))
 | 
			
		||||
                        r = self.reachable.intersect(self.prev(r))
 | 
			
		||||
                        child = QGoal(self.next(r.children()), g, 0, g.must, 0)
 | 
			
		||||
                        g = child
 | 
			
		||||
                        if not check_disjoint(self.init, self.prev(g.cube)):
 | 
			
		||||
                            # g is init, break the loop
 | 
			
		||||
                            break
 | 
			
		||||
                    init = g
 | 
			
		||||
                    while g.parent is not None:
 | 
			
		||||
                        g.parent.level = g.level + 1
 | 
			
		||||
                        g = g.parent
 | 
			
		||||
                    return init
 | 
			
		||||
                if g.parent is not None:
 | 
			
		||||
                    self.add_reachable(r0, g.parent.cube)
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            cube = None
 | 
			
		||||
            is_sat = sat
 | 
			
		||||
            f_1 = len(self.frames) - 1
 | 
			
		||||
            while f_1 >= f:
 | 
			
		||||
                for l in self.frames[f_1]:
 | 
			
		||||
                    if not l.bad and len(l.cube) > 0 and set(l.cube).issubset(g.cube):
 | 
			
		||||
                        cube = l.cube
 | 
			
		||||
                        is_sat == unsat
 | 
			
		||||
                        break
 | 
			
		||||
                f_1 -= 1
 | 
			
		||||
            if cube is None:
 | 
			
		||||
                cube, f_1, is_sat = self.is_inductive(f, g.cube)
 | 
			
		||||
            if is_sat == unsat:
 | 
			
		||||
                self.frames[f_1].add(QLemma(self.prev(cube)))
 | 
			
		||||
                self.block_cube(f_1, self.prev(cube))
 | 
			
		||||
                if f_1 < f0:
 | 
			
		||||
                    # learned clause might also be able to block same bad states in higher level
 | 
			
		||||
                    if set(list(cube)) != set(list(g.cube)):
 | 
			
		||||
                        self.push_heap(QGoal(cube, None, f_1 + 1, may, 0))
 | 
			
		||||
                        self.count_may += 1
 | 
			
		||||
                    else:
 | 
			
		||||
                        # re-queue g.cube in higher level, here g.parent is simply for tracking down the trace when output.
 | 
			
		||||
                        self.push_heap(QGoal(g.cube, g.parent, f_1 + 1, g.must, 0))
 | 
			
		||||
                        if not g.must:
 | 
			
		||||
                            self.count_may += 1
 | 
			
		||||
            else:
 | 
			
		||||
                # qcube is a predecessor of g
 | 
			
		||||
                qcube = QGoal(cube, g, f_1 - 1, g.must, 0)
 | 
			
		||||
                if not g.must:
 | 
			
		||||
                    self.count_may += 1
 | 
			
		||||
                self.push_heap(qcube)
 | 
			
		||||
 | 
			
		||||
        if verbose:
 | 
			
		||||
            print("")
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    # Check if there are two states next to each other that have the same clauses.
 | 
			
		||||
    def is_valid(self):
 | 
			
		||||
        i = 1
 | 
			
		||||
        inv = None
 | 
			
		||||
        while True:
 | 
			
		||||
            # self.states[].R contains full lemmas
 | 
			
		||||
            # self.frames[] contains delta-encoded lemmas
 | 
			
		||||
            while len(self.states) <= i+1:
 | 
			
		||||
                self.add_solver()
 | 
			
		||||
            while len(self.frames) <= i+1:
 | 
			
		||||
                self.frames.append(set())
 | 
			
		||||
            duplicates = set([])
 | 
			
		||||
            for l in self.frames[i+1]:
 | 
			
		||||
                if l in self.frames[i]:
 | 
			
		||||
                    duplicates |= {l}
 | 
			
		||||
            self.frames[i] = self.frames[i] - duplicates
 | 
			
		||||
            pushed = set([])
 | 
			
		||||
            for l in (self.frames[i] - self.frames[i+1]):
 | 
			
		||||
                if not l.bad:
 | 
			
		||||
                    s = self.states[i].solver
 | 
			
		||||
                    s.push()
 | 
			
		||||
                    s.add(self.next(Not(l.clause)))
 | 
			
		||||
                    s.add(l.clause)
 | 
			
		||||
                    is_sat = s.check()
 | 
			
		||||
                    s.pop()
 | 
			
		||||
                    if is_sat == unsat:
 | 
			
		||||
                        self.frames[i+1].add(l)
 | 
			
		||||
                        self.states[i+1].add(l.clause)
 | 
			
		||||
                        pushed |= {l}
 | 
			
		||||
            self.frames[i] = self.frames[i] - pushed
 | 
			
		||||
            if (not (self.states[i].R - self.states[i+1].R)
 | 
			
		||||
                and len(self.states[i].R) != 0):
 | 
			
		||||
                inv = prune(self.states[i].R)
 | 
			
		||||
                F_inf = self.frames[i]
 | 
			
		||||
                j = i + 1
 | 
			
		||||
                while j < len(self.states):
 | 
			
		||||
                    for l in F_inf:
 | 
			
		||||
                        self.states[j].add(l.clause)
 | 
			
		||||
                    j += 1
 | 
			
		||||
                self.frames[len(self.states)-1] = F_inf
 | 
			
		||||
                self.frames[i] = set([])
 | 
			
		||||
                break
 | 
			
		||||
            elif (len(self.states[i].R) == 0
 | 
			
		||||
                  and len(self.states[i+1].R) == 0):
 | 
			
		||||
                break
 | 
			
		||||
            i += 1
 | 
			
		||||
 | 
			
		||||
        if inv is not None:
 | 
			
		||||
            self.s_bad.push()
 | 
			
		||||
            self.s_bad.add(And(inv))
 | 
			
		||||
            is_sat = self.s_bad.check()
 | 
			
		||||
            if is_sat == unsat:
 | 
			
		||||
                self.s_bad.pop()
 | 
			
		||||
                return And(inv)
 | 
			
		||||
            self.s_bad.pop()
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        if not check_disjoint(self.init, self.bad):
 | 
			
		||||
            return "goal is reached in initial state"
 | 
			
		||||
        level = 0
 | 
			
		||||
        while True:
 | 
			
		||||
            inv = self.is_valid()  # self.add_solver() here
 | 
			
		||||
            if inv is not None:
 | 
			
		||||
                return inv
 | 
			
		||||
            is_sat, cube = self.unfold(level)
 | 
			
		||||
            if is_sat == unsat:
 | 
			
		||||
                level += 1
 | 
			
		||||
                if verbose:
 | 
			
		||||
                    print("Unfold %d" % level)
 | 
			
		||||
                sys.stdout.flush()
 | 
			
		||||
            elif is_sat == sat:
 | 
			
		||||
                cex = self.quip_blocked(cube, level)
 | 
			
		||||
                if cex is not None:
 | 
			
		||||
                    return cex
 | 
			
		||||
            else:
 | 
			
		||||
                return is_sat
 | 
			
		||||
 | 
			
		||||
def test(file):
 | 
			
		||||
    h2t = Horn2Transitions()
 | 
			
		||||
    h2t.parse(file)
 | 
			
		||||
    if verbose:
 | 
			
		||||
        print("Test file: %s") % file
 | 
			
		||||
    mp = Quip(h2t.init, h2t.trans, h2t.goal, h2t.xs, h2t.inputs, h2t.xns)
 | 
			
		||||
    start_time = time.time()
 | 
			
		||||
    result = mp.run()
 | 
			
		||||
    end_time = time.time()
 | 
			
		||||
    if isinstance(result, QGoal):
 | 
			
		||||
        g = result
 | 
			
		||||
        if verbose:
 | 
			
		||||
            print("Trace")
 | 
			
		||||
        while g:
 | 
			
		||||
           if verbose:
 | 
			
		||||
               print(g.level, g.cube)
 | 
			
		||||
           g = g.parent
 | 
			
		||||
        print("--- used %.3f seconds ---" % (end_time - start_time))
 | 
			
		||||
        validate(mp, result, mp.trans)
 | 
			
		||||
        return
 | 
			
		||||
    if isinstance(result, ExprRef):
 | 
			
		||||
        if verbose:
 | 
			
		||||
            print("Invariant:\n%s " % result)
 | 
			
		||||
        print("--- used %.3f seconds ---" % (end_time - start_time))
 | 
			
		||||
        validate(mp, result, mp.trans)
 | 
			
		||||
        return
 | 
			
		||||
    print(result)
 | 
			
		||||
 | 
			
		||||
def validate(var, result, trans):
 | 
			
		||||
    if isinstance(result, QGoal):
 | 
			
		||||
        g = result
 | 
			
		||||
        s = fd_solver()
 | 
			
		||||
        s.add(trans)
 | 
			
		||||
        while g.parent is not None:
 | 
			
		||||
            s.push()
 | 
			
		||||
            s.add(var.prev(g.cube))
 | 
			
		||||
            s.add(var.next(g.parent.cube))
 | 
			
		||||
            assert sat == s.check()
 | 
			
		||||
            s.pop()
 | 
			
		||||
            g = g.parent
 | 
			
		||||
        if verbose:
 | 
			
		||||
            print "--- validation succeed ----"
 | 
			
		||||
        return
 | 
			
		||||
    if isinstance(result, ExprRef):
 | 
			
		||||
        inv = result
 | 
			
		||||
        s = fd_solver()
 | 
			
		||||
        s.add(trans)
 | 
			
		||||
        s.push()
 | 
			
		||||
        s.add(var.prev(inv))
 | 
			
		||||
        s.add(Not(var.next(inv)))
 | 
			
		||||
        assert unsat == s.check()
 | 
			
		||||
        s.pop()
 | 
			
		||||
        cube = var.prev(var.init)
 | 
			
		||||
        step = 0
 | 
			
		||||
        while True:
 | 
			
		||||
            step += 1
 | 
			
		||||
            # too many steps to reach invariant
 | 
			
		||||
            if step > 1000:
 | 
			
		||||
                if verbose:
 | 
			
		||||
                    print "--- validation failed --"
 | 
			
		||||
                return
 | 
			
		||||
            if not check_disjoint(var.prev(cube), var.prev(inv)):
 | 
			
		||||
                # reach invariant
 | 
			
		||||
                break
 | 
			
		||||
            s.push()
 | 
			
		||||
            s.add(cube)
 | 
			
		||||
            assert s.check() == sat
 | 
			
		||||
            cube = var.projectN(s.model())
 | 
			
		||||
            s.pop()
 | 
			
		||||
        if verbose:
 | 
			
		||||
            print "--- validation succeed ----"
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
test("data/horn1.smt2")
 | 
			
		||||
test("data/horn2.smt2")
 | 
			
		||||
test("data/horn3.smt2")
 | 
			
		||||
test("data/horn4.smt2")
 | 
			
		||||
test("data/horn5.smt2")
 | 
			
		||||
# test("data/horn6.smt2")  # not able to finish
 | 
			
		||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue