Source code for avl_axi._item

# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Sequence Item

from collections import defaultdict
from collections.abc import MutableMapping, MutableSequence

from typing import Any

import avl
from z3 import ULE, BitVecVal, Implies, Or, ZeroExt

from ._signals import ar_m_signals, aw_m_signals, b_s_signals, is_random, r_s_signals, w_m_signals
from ._types import axi_atomic_t, axi_burst_t, axi_resp_t, signal_to_type
from ._utils import get_burst_addresses

[docs] class SequenceItem(avl.SequenceItem):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the sequence item :param name: Name of the sequence item :param parent: Parent component of the sequence item :return: None """ super().__init__(name, parent) # Handle to interface - defines capabilities and parameters self._i_f_ = avl.Factory.get_variable(f"{self.get_full_name()}.i_f", None) # Add events for finer grained sequence control self.add_event("awake") self.add_event("control") self.add_event("data") # By default transpose to make more readable self.set_table_fmt(transpose=True) # Local / Hidden Parameters for k, v in vars(self._i_f_).items(): if isinstance(v, (int | str)): setattr(self, f"_{k}_", v)
[docs] def resize(self, size : int = None, randomize : bool = False) -> None: """ Re-size transaction data fields based on len :param size: New size of the transaction (len+1) - if None use current len+1 :type size: int :param randomize: Randomize any un-created fields :type randomize: bool :return: None """ if size is None: n = self.get_len()+1 else: n = size # Re-Size Write Data for s in w_m_signals + ["w_wait_cycles"]: if hasattr(self, s): v = getattr(self, s) for i in range(n): if i not in v: v[i].value = 0 if randomize and v[i]._auto_random_: v[i].randomize() # Re-Size Read Data for s in r_s_signals + ["r_wait_cycles"]: if hasattr(self, s): if not self.has_rresp(): delattr(self, s) else: v = getattr(self, s) for i in range(n): if i not in v: v[i].value = 0 if randomize and v[i]._auto_random_: v[i].randomize() # Force IDs, loop to match if hasattr(self, "bid"): self.set("bid", self.get("awid", default=0)) if hasattr(self, "bloop"): self.set("bloop", self.get("awloop", default=0)) if hasattr(self, "btrace"): self.set("btrace", self.get("awtrace", default=0)) if hasattr(self, "bidunq"): self.set("bidunq", self.get("awidunq")) for i in range(n): if hasattr(self, "rid"): self.set("rid", self.get_id(), idx=i) if hasattr(self, "rloop"): self.set("rloop", self.get("arloop", default=0), idx=i) if hasattr(self, "rtrace"): self.set("rtrace", self.get("artrace", default=0), idx=i) if hasattr(self, "ridunq"): self.set("ridunq", self.get_idunq(), idx=i)
[docs] def sanity(self) -> None: """ Sanity Check fields vs. spec e.g. - command / response fields that should match - parameters that enforce values """ # Check length for s in w_m_signals + ["w_wait_cycles"] + r_s_signals + ["r_wait_cycles"]: if hasattr(self, s): v = getattr(self, s) assert len(v) == self.get_len() + 1 # Check size <= buswidth assert (1 << self.get("arsize", default=0)) <= self._i_f_.DATA_WIDTH // 8 assert (1 << self.get("awsize", default=0)) <= self._i_f_.DATA_WIDTH // 8 # Fixed burst disable must be obeyed if self._Fixed_Burst_Disable_ or self._Regular_Transactions_Only_: assert self.get_burst() != axi_burst_t.FIXED # Max Transaction Bytes must be obeyed assert (self.get_len()+1)*self.get_size() <= self._Max_Transaction_Bytes_ # Regular Transactions limitations must be obeyed if self._Regular_Transactions_Only_: # Limited Range of lengths assert self.get_len() in [0,1,3,7,15] # Bursts must used whole bus width if self.get_len() != 0: assert self.get_size() == self._DATA_WIDTH_/8 # Address Alignment if self.get_burst() == axi_burst_t.INCR: assert self.get_addr() % ((self.get_len()+1)*self.get_size()) == 0 elif self.get_burst() == axi_burst_t.WRAP: assert self.get_addr() % self.get_size() == 0 else: raise ValueError("Unexpected burst type") if self.has_bresp(): # Signals which must match command -> response assert self.get_id() == self.get("bid", default=0) assert self.get_loop() == self.get("bloop", default=0) assert self.get_trace() == self.get("btrace", default=0) assert self.get_idunq() == self.get("bidunq", default=0) if self.has_rresp(): for i in range(self.get_len()+1): # Signals which must match command -> response assert self.get_id() == self.get("rid", default=0, idx=i) assert self.get_loop() == self.get("rloop", default=0, idx=i) assert self.get_trace() == self.get("rtrace", default=0, idx=i) assert self.get_idunq() == self.get("ridunq", default=0, idx=i) # Consistent DECERR - any decerr in responses must cause all decerr if self._Consistent_DECERR_: assert self.get("rresp", default=axi_resp_t.OKAY, idx=i) == self.get("rresp", default=axi_resp_t.OKAY, idx=0)
[docs] def post_randomize(self): """ Post Randomize actions """ super().post_randomize() self.resize(randomize=True) # Force alignment for Regular Transaction - quicker than constraint if self._Regular_Transactions_Only_: addr = self.get_addr() if self.get_burst() == axi_burst_t.INCR: mask = ((self.get_len()+1)*self.get_size()) -1 elif self.get_burst() == axi_burst_t.WRAP: mask = self.get_size() -1 else: raise ValueError(f"Unexpected burst_type_t {self.get('aw')}") self.set_addr(addr & ~mask)
[docs] def set(self, name : str, value : int, idx : int = None) -> None: """ Set the value of a field in the sequence item - if it exists. :param name: Name of the field to set :param value: Value to set for the field :return: None """ signal = getattr(self, name, None) if isinstance(signal, (MutableSequence | MutableMapping | tuple)): if idx is not None: signal[idx].value = int(value) else: for i,v in enumerate(value): signal[i].value = int(v) elif signal is not None: signal.value = int(value)
[docs] def get(self, name : str, idx : int = None, default : Any = None) -> int: """ Get the value of a field in the sequence item - if it exists. :param name: Name of the field to get :param default: Default value to return if the field does not exist :return: Value of the field or default value """ signal = getattr(self, name, None) if isinstance(signal, (MutableSequence | MutableMapping | tuple)): if idx is not None: return signal[idx].value else: return signal elif signal is not None: return signal.value return default
[docs] def get_addr(self) -> int: """ Return the address :return: Addr (awaddr, araddr) """ if hasattr(self, "awaddr"): return int(self.awaddr) else: return int(self.araddr)
[docs] def set_addr(self, addr : int) -> None: """ Set the address :return: Addr (awaddr, araddr) """ if hasattr(self, "awaddr"): self.set("awaddr", addr) else: self.set("araddr", addr)
[docs] def get_id(self) -> int: """ Return ID :return: ID (awid, arid) """ if hasattr(self, "awaddr"): return int(self.get("awid", default=0)) else: return int(self.get("arid", default=0))
[docs] def get_idunq(self) -> int: """ Return ID Unique :return: ID Unique (awidunq, aridunq) """ if hasattr(self, "awaddr"): return int(self.get("awidunq", default=0)) else: return int(self.get("aridunq", default=0))
[docs] def get_tagop(self) -> int: """ Return TAG operation :return: TAG operation (awtagop, artagop) """ if hasattr(self, "awaddr"): return int(self.get("awtagop", default=0)) else: return int(self.get("artagop", default=0))
[docs] def get_len(self) -> int: """ Return Length :return: Length (arlen or awlen) """ if hasattr(self, "awaddr"): return int(self.get("awlen", default=0)) else: return int(self.get("arlen", default=0))
[docs] def get_size(self) -> int: """ Return Size (in bytes) :return: Size """ if hasattr(self, "awaddr"): return int(1<<self.get("awsize", default=0)) else: return int(1<<self.get("arsize", default=0))
[docs] def get_burst(self) -> int: """ Return Burst :return: Burst """ if hasattr(self, "awaddr"): return int(self.get("awburst", default=axi_burst_t.INCR)) else: return int(self.get("arburst", default=axi_burst_t.INCR))
[docs] def get_loop(self) -> int: """ Return Loop :return: Length (arloop or awloop) """ if hasattr(self, "awaddr"): return int(self.get("awloop", default=0)) else: return int(self.get("arloop", default=0))
[docs] def get_trace(self) -> int: """ Return Trace :return: Length (artrace or awtrace) """ if hasattr(self, "awaddr"): return int(self.get("awtrace", default=0)) else: return int(self.get("artrace", default=0))
[docs] def has_bresp(self) -> bool: """ Expect a response on bresp channel """ if hasattr(self, "awatop"): return self.awatop.has_bresp() elif hasattr(self, "awaddr"): return True return False
[docs] def has_rresp(self) -> bool: """ Expect a response on resp channel """ if hasattr(self, "awatop"): return self.awatop.has_rresp() elif hasattr(self, "araddr"): return True return False
[docs] class WriteItem(SequenceItem):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: # noqa: C901 """ Initialize the sequence item :param name: Name of the sequence item :param parent: Parent component of the sequence item """ super().__init__(name, parent) if hasattr(self._i_f_, "awakeup"): self.goto_sleep = avl.Logic(0, width=len(self._i_f_.awakeup), fmt=str) # Write Control Signals for s in aw_m_signals: if s in ["awvalid"]: continue if hasattr(self._i_f_, s): v = getattr(self._i_f_, s) setattr(self, s, signal_to_type(s)(0, width=len(v), auto_random=is_random(s))) # Write Data Signals for s in w_m_signals: if s in ["wvalid", "wlast"]: continue if hasattr(self._i_f_, s): v = getattr(self._i_f_, s) setattr(self, s, defaultdict(lambda s=s,v=v: signal_to_type(s)(0, width=len(v), auto_random=is_random(s)))) # Read Response Signals - atomic loads for s in r_s_signals: if s in ["rvalid", "rlast"]: continue if hasattr(self._i_f_, s): v = getattr(self._i_f_, s) setattr(self, s, defaultdict(lambda s=s,v=v: signal_to_type(s)(0, width=len(v), auto_random=is_random(s)))) # Write Response Signals for s in b_s_signals: if s in ["bvalid"]: continue if hasattr(self._i_f_, s): v = getattr(self._i_f_, s) setattr(self, s, signal_to_type(s)(0, width=len(v), auto_random=is_random(s))) # Wait cycles self.aw_wait_cycles = avl.Uint8(0, auto_random=False) """Wait cycles between control awvalid and control awready""" self.set_field_attributes("aw_wait_cycles", compare=False) self.w_wait_cycles = defaultdict(lambda: avl.Uint8(0, auto_random=False)) """Wait cycles between data wvalid and data wready""" self.set_field_attributes("w_wait_cycles", compare=False) self.b_wait_cycles = avl.Uint8(0, auto_random=False) """Wait cycles between data bvalid and data bready""" self.set_field_attributes("b_wait_cycles", compare=False) # Reduced signals used for coverage self._wdata_ = [avl.Logic(0, width=self.wdata[0].width), avl.Logic(-1, width=self.wdata[0].width)] if hasattr(self, "wstrb"): self._wstrb_ = [avl.Logic(0, width=self.wstrb[0].width), avl.Logic(-1, width=self.wstrb[0].width)] if hasattr(self, "wuser"): self._wuser_ = [avl.Logic(0, width=self.wuser[0].width), avl.Logic(-1, width=self.wuser[0].width)] if hasattr(self, "wpoison"): self._wpoison_ = [avl.Logic(0, width=self.wpoison[0].width), avl.Logic(-1, width=self.wpoison[0].width)] if hasattr(self, "wtrace"): self._wtrace_ = [avl.Logic(0, width=self.wtrace[0].width), avl.Logic(-1, width=self.wtrace[0].width)] # Constraints if hasattr(self, "awsize"): self.add_constraint("c_awsize", lambda x : ULE(x, BitVecVal((self._i_f_.DATA_WIDTH//8).bit_length()-1, x.size())), self.awsize) if hasattr(self, "awlen"): if hasattr(self, "awsize"): self.add_constraint("c_max_transaction_bytes", lambda x,y : ULE(((ZeroExt(8, x) + BitVecVal(1, 16)) << ZeroExt(13, y)), BitVecVal(self._i_f_.Max_Transaction_Bytes,16)), self.awlen, self.awsize) else: self.add_constraint("c_max_transaction_bytes", lambda x : ULE((ZeroExt(8, x) + BitVecVal(1, 16)), BitVecVal(self._i_f_.Max_Transaction_Bytes,16)), self.awlen) elif hasattr(self, "awsize"): self.add_constraint("c_max_transaction_bytes", lambda y : ULE(1 << ZeroExt(13, y), BitVecVal(self._i_f_.Max_Transaction_Bytes,16)), self.awsize) if hasattr(self, "awatop"): self.add_constraint("c_awatop_legal", lambda x: Or(*(x == v for v in self.awatop.values())), self.awatop) self.add_constraint("c_awatop_size", lambda x,y : Implies(x != axi_atomic_t.NON_ATOMIC, ULE(y,5)), self.awatop, self.awsize) if hasattr(self, "swstashniden"): self.add_constraint("c_swstashnid", lambda x,y : Implies(y == 0, x == 0), self.awstashniden, self.awstashnid) if hasattr(self, "swstashlpiden"): self.add_constraint("c_swstashlpid", lambda x,y : Implies(y == 0, x == 0), self.awstashlpiden, self.awstashlpid) if hasattr(self, "awtagop"): self.add_constraint("c_awtagop_burst", lambda x,y : Implies(x != 0, y != axi_burst_t.FIXED), self.awtagop, self.awburst) self.add_constraint("c_awtagop_cache", lambda x,y : Implies(x != 0, y & 0b1111 == 0b0011), self.awtagop, self.awcache) if hasattr(self, "awidunq"): self.add_constraint("c_awtagop_cache", lambda x,y : Implies(x != 0, y == 1), self.awtagop, self.awidunq) if hasattr(self, "awburst") and self._i_f_.Fixed_Burst_Disable: self.add_constraint("c_fixed_burst_disable", lambda x : x != axi_burst_t.FIXED, self.awburst) if self._i_f_.Regular_Transactions_Only: if hasattr(self, "awlen"): self.add_constraint("c_regular_len", lambda x : Or(x==0, x==1, x==3, x==7, x==15), self.awlen) if hasattr(self, "awsize"): self.add_constraint("c_regular_size", lambda x,y: Implies(y != 0, (1 << ZeroExt(8, x) == self._i_f_.DATA_WIDTH/8)), self.awsize, self.awlen) if hasattr(self, "awburst"): self.add_constraint("c_regualr_burst", lambda x : x != axi_burst_t.FIXED, self.awburst)
[docs] def post_randomize(self): """ Post Randomize actions """ super().post_randomize() # Post process wstrbs to be legal # Difficult to constrain so patch if hasattr(self, "wstrb"): addresses = get_burst_addresses(self.awaddr, self.get("awlen", default=0), self.get("awsize", default=0), self.get("awburst", default=axi_burst_t.INCR)) wstrb_mask = (1 << (1 << self.get("awsize", default=0))) - 1 for i,a in enumerate(addresses): offset = a & (self._i_f_.DATA_WIDTH//8)-1 self.wstrb[i] &= (wstrb_mask << offset)
[docs] class ReadItem(SequenceItem):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: # noqa: C901 """ Initialize the sequence item :param name: Name of the sequence item :param parent: Parent component of the sequence item """ super().__init__(name, parent) if hasattr(self._i_f_, "awakeup"): self.goto_sleep = avl.Logic(0, width=len(self._i_f_.awakeup), fmt=str) # Read Control Signals for s in ar_m_signals: if s in ["arvalid"]: continue if hasattr(self._i_f_, s): v = getattr(self._i_f_, s) setattr(self, s, signal_to_type(s)(0, width=len(v), auto_random=is_random(s))) # Read Response Signals for s in r_s_signals: if s in ["rvalid", "rlast"]: continue if hasattr(self._i_f_, s): v = getattr(self._i_f_, s) setattr(self, s, defaultdict(lambda s=s,v=v: signal_to_type(s)(0, width=len(v), auto_random=is_random(s)))) # Wait cycles self.ar_wait_cycles = avl.Uint8(0, auto_random=False) """Wait cycles between control arvalid and control arready""" self.set_field_attributes("ar_wait_cycles", compare=False) self.r_wait_cycles = defaultdict(lambda: avl.Uint8(0, auto_random=False)) """Wait cycles between data rvalid and data rready""" self.set_field_attributes("r_wait_cycles", compare=False) # Reduced signals used for coverage self._rdata_ = [avl.Logic(0, width=self.rdata[0].width), avl.Logic(-1, width=self.rdata[0].width)] if hasattr(self, "rresp"): self._rresp_ = [avl.Logic(0, width=self.rresp[0].width), avl.Logic(-1, width=self.rresp[0].width)] if hasattr(self, "ruser"): self._ruser_ = [avl.Logic(0, width=self.ruser[0].width), avl.Logic(-1, width=self.ruser[0].width)] if hasattr(self, "rpoison"): self._rpoison_ = [avl.Logic(0, width=self.rpoison[0].width), avl.Logic(-1, width=self.rpoison[0].width)] if hasattr(self, "rtrace"): self._rtrace_ = [avl.Logic(0, width=self.rtrace[0].width), avl.Logic(-1, width=self.rtrace[0].width)] if hasattr(self, "rloop"): self._rloop_ = [avl.Logic(0, width=self.rloop[0].width), avl.Logic(-1, width=self.rloop[0].width)] # Constraints if hasattr(self, "arsize"): self.add_constraint("c_arsize", lambda x : ULE(x, BitVecVal((self._i_f_.DATA_WIDTH//8).bit_length()-1, x.size())), self.arsize) if hasattr(self, "arlen"): if hasattr(self, "arsize"): self.add_constraint("c_max_transaction_bytes", lambda x,y : ULE(((ZeroExt(8, x) + 1) << ZeroExt(13, y)), BitVecVal(self._i_f_.Max_Transaction_Bytes, 16)), self.arlen, self.arsize) else: self.add_constraint("c_max_transaction_bytes", lambda x : (ULE(ZeroExt(8, x) +1), BitVecVal(self._i_f_.Max_Transaction_Bytes, 16)), self.arlen) elif hasattr(self, "arsize"): self.add_constraint("c_max_transaction_bytes", lambda y : ULE(1 << ZeroExt(13, y), BitVecVal(self._i_f_.Max_Transaction_Bytes, 16)), self.arsize) if hasattr(self, "artagop"): self.add_constraint("c_artagop_reserved", lambda x : x != 0b10, self.artagop) self.add_constraint("c_artagop_rtag", lambda x,y : Implies(x == 0, y == 0), self.artagop, self.rtag) self.add_constraint("c_artagop_burst", lambda x,y : Implies(x != 0, y != axi_burst_t.FIXED), self.artagop, self.arburst) self.add_constraint("c_artagop_cache", lambda x,y : Implies(x != 0, y & 0b1111 == 0b0011), self.artagop, self.arcache) if hasattr(self, "aridunq"): self.add_constraint("c_artagop_cache", lambda x,y : Implies(x != 0, y == 1), self.artagop, self.aridunq) if hasattr(self, "arburst") and self._i_f_.Fixed_Burst_Disable: self.add_constraint("c_fixed_burst_disable", lambda x : x != axi_burst_t.FIXED, self.arburst) if self._i_f_.Regular_Transactions_Only: if hasattr(self, "arlen"): self.add_constraint("c_regular_len", lambda x : Or(x==0, x==1, x==3, x==7, x==15), self.arlen) if hasattr(self, "arsize"): self.add_constraint("c_regular_size", lambda x,y: Implies(y != 0, (1 << ZeroExt(8, x) == self._i_f_.DATA_WIDTH/8)), self.arsize, self.arlen) if hasattr(self, "arburst"): self.add_constraint("c_regualr_burst", lambda x : x != axi_burst_t.FIXED, self.arburst)
[docs] def post_randomize(self): """ Post Randomize actions """ super().post_randomize() if self._Consistent_DECERR_: for i in range(len(self.resp)): if self.rresp[i] == axi_resp_t.DECERR: for j in range(len(self.rresp)): self.rresp[j] = axi_resp_t.DECERR return
__all__ = ["SequenceItem", "WriteItem", "ReadItem"]