Queue formal proof based on one-entry FIFO equivalence #11

Merged
programmerjake merged 3 commits from cesar/fayalite:fifo-proof into master 2024-12-29 21:05:26 +00:00
Showing only changes of commit 31d01046a8 - Show all commits

View file

@ -195,7 +195,7 @@ mod tests {
assert_formal( assert_formal(
format_args!("test_queue_{capacity}_{inp_ready_is_comb}_{out_valid_is_comb}"), format_args!("test_queue_{capacity}_{inp_ready_is_comb}_{out_valid_is_comb}"),
queue_test(capacity, inp_ready_is_comb, out_valid_is_comb), queue_test(capacity, inp_ready_is_comb, out_valid_is_comb),
FormalMode::Prove, FormalMode::BMC,
programmerjake marked this conversation as resolved Outdated

imo this should be changed this back to Prove before merging. ah, you already said that...

imo this should be changed this back to Prove before merging. ah, you already said that...
14, 14,
None, None,
ExportOptions { ExportOptions {
@ -203,6 +203,16 @@ mod tests {
..ExportOptions::default() ..ExportOptions::default()
}, },
); );
/// Formal verification of the FIFO queue
///
/// The strategy derives from the observation that, if we filter its
/// input and output streams to consider just one in every N reads and
/// writes (where N is the FIFO capacity), then the FIFO effectively
/// behaves as a one-entry FIFO.
///
/// In particular, any counterexample of the full FIFO behaving badly
/// will also be caught by one of the filtered versions (one which
/// happens to be in phase with the offending input or output).
#[hdl_module] #[hdl_module]
fn queue_test(capacity: NonZeroUsize, inp_ready_is_comb: bool, out_valid_is_comb: bool) { fn queue_test(capacity: NonZeroUsize, inp_ready_is_comb: bool, out_valid_is_comb: bool) {
#[hdl] #[hdl]
@ -217,6 +227,8 @@ mod tests {
rst: formal_reset().to_reset(), rst: formal_reset().to_reset(),
}, },
); );
// random input data
#[hdl] #[hdl]
let inp_data: HdlOption<UInt<8>> = wire(); let inp_data: HdlOption<UInt<8>> = wire();
#[hdl] #[hdl]
@ -225,16 +237,25 @@ mod tests {
} else { } else {
connect(inp_data, HdlNone()); connect(inp_data, HdlNone());
programmerjake marked this conversation as resolved Outdated
here use `a one-entry` -- [the `o` makes a `w` sound so is treated as a consonant for `a` vs. `an`](https://owl.purdue.edu/owl/general_writing/grammar/articles_a_versus_an.html#:~:text=or%20%22o%22%20makes%20the%20same%20sound%20as%20%22w%22%20in%20%22won%2C)
} }
// assert output ready at random
#[hdl] #[hdl]
let out_ready: Bool = wire(); let out_ready: Bool = wire();
connect(out_ready, any_seq(Bool)); connect(out_ready, any_seq(Bool));
let index_ty: UInt<32> = UInt::TYPE;
// The current number of elements in the FIFO ranges from zero to
// maximum capacity, inclusive.
let count_ty = UInt::range_inclusive(0..=capacity.get());
// type for counters that wrap around at the FIFO capacity
let index_ty = UInt::range(0..capacity.get());
// among all entries of the FIFO internal circular memory, choose
// one at random to check
#[hdl] #[hdl]
let index_to_check = wire(); let index_to_check = wire(index_ty);
connect(index_to_check, any_const(index_ty)); connect(index_to_check, any_const(index_ty));
let index_max = !index_ty.zero();
// we saturate at index_max, so only check indexes where we properly maintain position // instantiate and connect the queue
hdl_assume(clk, index_to_check.cmp_ne(index_max), "");
#[hdl] #[hdl]
let dut = instance(queue( let dut = instance(queue(
UInt[ConstUsize::<8>], UInt[ConstUsize::<8>],
@ -245,108 +266,121 @@ mod tests {
connect(dut.cd, cd); connect(dut.cd, cd);
connect(dut.inp.data, inp_data); connect(dut.inp.data, inp_data);
connect(dut.out.ready, out_ready); connect(dut.out.ready, out_ready);
hdl_assume(
clk,
index_to_check.cmp_ne(!Expr::ty(index_to_check).zero()),
"",
);
// Keep an independent count of words in the FIFO. Ensure that
// it's always correct, and never overflows.
#[hdl] #[hdl]
let expected_count_reg = reg_builder().clock_domain(cd).reset(0u32); let expected_count_reg = reg_builder().clock_domain(cd).reset(count_ty.zero());
#[hdl]
let next_expected_count = wire();
connect(next_expected_count, expected_count_reg);
connect(expected_count_reg, next_expected_count);
#[hdl] #[hdl]
if ReadyValid::firing(dut.inp) & !ReadyValid::firing(dut.out) { if ReadyValid::firing(dut.inp) & !ReadyValid::firing(dut.out) {
connect_any(next_expected_count, expected_count_reg + 1u8); hdl_assert(clk, expected_count_reg.cmp_ne(capacity.get()), "");
connect_any(expected_count_reg, expected_count_reg + 1u8);
} else if !ReadyValid::firing(dut.inp) & ReadyValid::firing(dut.out) { } else if !ReadyValid::firing(dut.inp) & ReadyValid::firing(dut.out) {
connect_any(next_expected_count, expected_count_reg - 1u8); hdl_assert(clk, expected_count_reg.cmp_ne(count_ty.zero()), "");
connect_any(expected_count_reg, expected_count_reg - 1u8);
} }
hdl_assert(cd.clk, expected_count_reg.cmp_eq(dut.count), ""); hdl_assert(clk, expected_count_reg.cmp_eq(dut.count), "");
#[hdl]
let prev_out_ready_reg = reg_builder().clock_domain(cd).reset(!0_hdl_u3);
connect_any(
prev_out_ready_reg,
(prev_out_ready_reg << 1) | out_ready.cast_to(UInt[1]),
);
#[hdl]
let prev_inp_valid_reg = reg_builder().clock_domain(cd).reset(!0_hdl_u3);
connect_any(
prev_inp_valid_reg,
(prev_inp_valid_reg << 1) | HdlOption::is_some(inp_data).cast_to(UInt[1]),
);
hdl_assume(
clk,
(prev_out_ready_reg & prev_inp_valid_reg).cmp_ne(0u8),
"",
);
// keep an independent write index into the FIFO's circular buffer
#[hdl] #[hdl]
let inp_index_reg = reg_builder().clock_domain(cd).reset(index_ty.zero()); let inp_index_reg = reg_builder().clock_domain(cd).reset(index_ty.zero());
#[hdl] #[hdl]
let stored_inp_data_reg = reg_builder().clock_domain(cd).reset(0u8); if ReadyValid::firing(dut.inp) {
#[hdl] #[hdl]
if let HdlSome(data) = ReadyValid::firing_data(dut.inp) { if inp_index_reg.cmp_ne(capacity.get() - 1) {
#[hdl]
if inp_index_reg.cmp_lt(index_max) {
connect_any(inp_index_reg, inp_index_reg + 1u8); connect_any(inp_index_reg, inp_index_reg + 1u8);
#[hdl] } else {
if inp_index_reg.cmp_eq(index_to_check) { connect_any(inp_index_reg, 0_hdl_u0);
connect(stored_inp_data_reg, data);
}
} }
} }
#[hdl] // keep an independent read index into the FIFO's circular buffer
if inp_index_reg.cmp_lt(index_to_check) {
hdl_assert(clk, stored_inp_data_reg.cmp_eq(0u8), "");
}
#[hdl] #[hdl]
let out_index_reg = reg_builder().clock_domain(cd).reset(index_ty.zero()); let out_index_reg = reg_builder().clock_domain(cd).reset(index_ty.zero());
#[hdl] #[hdl]
let stored_out_data_reg = reg_builder().clock_domain(cd).reset(0u8); if ReadyValid::firing(dut.out) {
#[hdl] #[hdl]
if let HdlSome(data) = ReadyValid::firing_data(dut.out) { if out_index_reg.cmp_ne(capacity.get() - 1) {
#[hdl]
if out_index_reg.cmp_lt(index_max) {
connect_any(out_index_reg, out_index_reg + 1u8); connect_any(out_index_reg, out_index_reg + 1u8);
#[hdl]
if out_index_reg.cmp_eq(index_to_check) {
connect(stored_out_data_reg, data);
}
}
}
#[hdl]
if out_index_reg.cmp_lt(index_to_check) {
hdl_assert(clk, stored_out_data_reg.cmp_eq(0u8), "");
}
hdl_assert(clk, inp_index_reg.cmp_ge(out_index_reg), "");
#[hdl]
if inp_index_reg.cmp_lt(index_max) & out_index_reg.cmp_lt(index_max) {
hdl_assert(
clk,
expected_count_reg.cmp_eq(inp_index_reg - out_index_reg),
"",
);
} else { } else {
hdl_assert( connect_any(out_index_reg, 0_hdl_u0);
clk, }
expected_count_reg.cmp_ge(inp_index_reg - out_index_reg),
"",
);
} }
// filter the input data stream, predicated by the read index
// matching the chosen position in the FIFO's circular buffer
#[hdl] #[hdl]
if inp_index_reg.cmp_gt(index_to_check) & out_index_reg.cmp_gt(index_to_check) { let inp_index_matches = wire();
hdl_assert(clk, stored_inp_data_reg.cmp_eq(stored_out_data_reg), ""); connect(inp_index_matches, inp_index_reg.cmp_eq(index_to_check));
#[hdl]
let inp_firing_data = wire();
connect(inp_firing_data, HdlNone());
#[hdl]
if inp_index_matches {
connect(inp_firing_data, ReadyValid::firing_data(dut.inp));
}
// filter the output data stream, predicated by the write index
// matching the chosen position in the FIFO's circular buffer
#[hdl]
let out_index_matches = wire();
connect(out_index_matches, out_index_reg.cmp_eq(index_to_check));
#[hdl]
let out_firing_data = wire();
connect(out_firing_data, HdlNone());
#[hdl]
if out_index_matches {
connect(out_firing_data, ReadyValid::firing_data(dut.out));
}
// Implement a one-entry FIFO and ensure its equivalence to the
// filtered FIFO.
//
// the holding register for our one-entry FIFO
#[hdl]
let stored_reg = reg_builder().clock_domain(cd).reset(HdlNone());
#[hdl]
match stored_reg {
// If the holding register is empty...
HdlNone => {
#[hdl]
match inp_firing_data {
// ... and we are not receiving data, then we must not
// transmit any data.
HdlNone => hdl_assert(clk, HdlOption::is_none(out_firing_data), ""),
// If we are indeed receiving some data...
HdlSome(data_in) => {
#[hdl]
match out_firing_data {
// ... and transmitting at the same time, we
// must be transmitting the input data itself,
// since the holding register is empty.
HdlSome(data_out) => hdl_assert(clk, data_out.cmp_eq(data_in), ""),
// If we are receiving, but not transmitting,
// store the received data in the holding
// register.
HdlNone => connect(stored_reg, HdlSome(data_in)),
}
}
programmerjake marked this conversation as resolved Outdated

same here

same here
}
}
// If there is some value stored in the holding register...
HdlSome(stored) => {
#[hdl]
match out_firing_data {
// ... and we are not transmitting it, we cannot
// receive any more data.
HdlNone => hdl_assert(clk, HdlOption::is_none(inp_firing_data), ""),
// If we are transmitting a previously stored value...
HdlSome(data_out) => {
// ... it must be the same data we stored earlier.
hdl_assert(clk, data_out.cmp_eq(stored), "");
// Also, accept new data, if any. Otherwise,
// let the holding register become empty.
connect(stored_reg, inp_firing_data);
}
}
}
} }
} }
} }