Source code for avl_axi._smemory

# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Subordinate Memory


import avl

from ._item import ReadItem, WriteItem
from ._types import axi_atomic_t, axi_resp_t
from ._utils import get_burst_addresses


[docs] class SubordinateMemory(avl.Memory):
[docs] def __init__(self, width : int = 32) -> None: """ Initialize the Memory for the AXI Subordinate Driver :param name: Name of the agent instance :type name: str :param parent: Parent component :type parent: Component """ super().__init__(width=width) # Misses return DECERR not assert self.miss = lambda address : None # Endianness Swap self._endianness_swap_ = False
[docs] def read(self, address: int, num_bytes : int = None) -> int: """ Read a value from the memory at the specified address. Calls miss() if the address is not found in memory. :param address: Address to read from. :type address: int :return: Value at the specified address. :rtype: int """ if self._endianness_swap_: return self._convert_endianness_(super().read(address, num_bytes=num_bytes, rotated=True), num_bytes=num_bytes) else: return super().read(address, num_bytes=num_bytes, rotated=True)
def _mask_by_strobe(self, data: int, strobe, n_bytes: int) -> int: """Byte-wise apply WSTRB to WDATA: each cleared strobe bit zeroes the corresponding data byte. Mirrors what a spec-compliant subordinate sees on the wire: bytes with WSTRB=0 are not part of the write. With strobe all-zero the returned value is 0. """ if strobe is None: return int(data) s = int(strobe) d = int(data) mask = 0 for j in range(int(n_bytes)): if (s >> j) & 1: mask |= 0xFF << (8 * j) return d & mask
[docs] def write(self, address: int, value: int, num_bytes : int = None, strobe : int = None) -> None: """ Write a value to the memory at the specified address. Calls miss() if the address is not found in memory. :param address: Address to write to. :type address: int :param value: Value to write. :type value: int :param num_bytes: Number of bytes to write (default is width // 8). :type num_bytes: int, optional :param strobe: Strobe signal :type strobe: int, optional """ if self._endianness_swap_: value = self._convert_endianness_(value, num_bytes=num_bytes) # Write to memory super().write(address, value, num_bytes=num_bytes, strobe=strobe, rotated=True)
def _convert_endianness_(self, value: int, num_bytes: int) -> int: """ Convert the endianness of an integer represented in nbytes. :param value: Unsigned integer :param width: Bit width (e.g., 8, 16, 32) :return: Unsigned integer """ mask = (1 << (num_bytes * 8)) - 1 value &= mask b = value.to_bytes(num_bytes, byteorder="little", signed=False) return int.from_bytes(b[::-1], byteorder="little", signed=False) def _unsigned_to_signed_(self, value: int, width: int) -> int: """ Convert an unsigned integer to a signed integer with given bit width. :param value: Unsigned integer :param width: Bit width (e.g., 8, 16, 32) :return: Signed integer """ mask = 1 << (width - 1) if value & mask: return value - (1 << width) return value def _signed_to_unsigned_(self, value: int, width: int) -> int: """ Convert an signed integer to a unsigned integer with given bit width. :param value: Unsigned integer :param width: Bit width (e.g., 8, 16, 32) :return: Signed integer """ mask = (1 << width) - 1 return value & mask
[docs] def swap(self, address: int, value: int, num_bytes : int = None) -> int: """ Swap values :param address: Address :type address: int :param value: Value to apply :type value: int :return: Original value at the specified address. :rtype: int """ self.write(address, value, num_bytes = num_bytes)
[docs] def compare(self, address: int, value: int, compare: int, num_bytes : int = None) -> int: """ Compare Values :param address: Address :type address: int :param value: Value to apply :type value: int :return: Original value at the specified address. :rtype: int """ old_value = self.read(address, num_bytes=num_bytes) if old_value == compare: self.write(address, value, num_bytes=num_bytes)
[docs] def add(self, address: int, value: int, num_bytes : int = None) -> int: """ Add Values :param address: Address :type address: int :param value: Value to apply :type value: int :return: Original value at the specified address. :rtype: int """ old_value = self.read(address, num_bytes=num_bytes) self.write(address, value+old_value, num_bytes = num_bytes)
[docs] def clr(self, address: int, value: int, num_bytes : int = None) -> int: """ Bitwise Clear Value :param address: Address :type address: int :param value: Value to apply :type value: int :return: Original value at the specified address. :rtype: int """ old_value = self.read(address, num_bytes=num_bytes) self.write(address, ~value&old_value, num_bytes = num_bytes)
[docs] def xor(self, address: int, value: int, num_bytes : int = None) -> int: """ Bitwise Exclusive Or Value :param address: Address :type address: int :param value: Value to apply :type value: int :return: Original value at the specified address. :rtype: int """ old_value = self.read(address, num_bytes=num_bytes) self.write(address, value^old_value, num_bytes = num_bytes)
[docs] def set(self, address: int, value: int, num_bytes : int = None) -> int: """ Bitwise OR Value :param address: Address :type address: int :param value: Value to apply :type value: int :return: Original value at the specified address. :rtype: int """ old_value = self.read(address, num_bytes=num_bytes) self.write(address, value|old_value, num_bytes = num_bytes)
[docs] def smax(self, address: int, value: int, num_bytes : int = None) -> int: """ Signed Max :param address: Address :type address: int :param value: Value to apply :type value: int :return: Original value at the specified address. :rtype: int """ old_value = self._unsigned_to_signed_(self.read(address, num_bytes=num_bytes), 8*num_bytes) value = self._unsigned_to_signed_(value, 8*num_bytes) self.write(address, self._signed_to_unsigned_(max(value,old_value),8*num_bytes), num_bytes = num_bytes)
[docs] def smin(self, address: int, value: int, num_bytes : int = None) -> int: """ Unsigned Min :param address: Address :type address: int :param value: Value to apply :type value: int :return: Original value at the specified address. :rtype: int """ old_value = self._unsigned_to_signed_(self.read(address, num_bytes=num_bytes), 8*num_bytes) value = self._unsigned_to_signed_(value, 8*num_bytes) self.write(address, self._signed_to_unsigned_(min(value,old_value), 8*num_bytes), num_bytes = num_bytes)
[docs] def umax(self, address: int, value: int, num_bytes : int = None) -> int: """ Unsigned Max :param address: Address :type address: int :param value: Value to apply :type value: int :return: Original value at the specified address. :rtype: int """ old_value = self.read(address, num_bytes=num_bytes) self.write(address, max(value,old_value), num_bytes = num_bytes)
[docs] def umin(self, address: int, value: int, num_bytes : int = None) -> int: """ Unsigned Min :param address: Address :type address: int :param value: Value to apply :type value: int :return: Original value at the specified address. :rtype: int """ old_value = self.read(address, num_bytes=num_bytes) self.write(address, min(value,old_value), num_bytes = num_bytes)
[docs] def process_write(self, item : WriteItem) -> None: """ Process a sequence item update for memory update :param item: The sequence item to process :type item: SequenceItem """ if not isinstance(item, WriteItem): raise TypeError("Item must be a WriteItem") # Update memory contents for i,a in enumerate(get_burst_addresses(item.get("awaddr"), item.get("awlen", default=0), item.get("awsize", default=0), item.get("awburst", default=1) )): if self._check_address_(a): num_bytes = 1<<(item.get("awsize", default=0)) wdata = item.get("wdata", idx=i, default=0) wstrb = item.get("wstrb", idx=i, default=None) # Honor WSTRB on the operand uniformly across all writes (atomic # and non-atomic): bytes with WSTRB=0 are not part of the operand # and are zeroed before the operation runs. For spec-compliant # atomics A6.4.5 requires all strobes within the data window to # be asserted, so this mask is a no-op for them; it still # protects the operand against any unstrobed garbage outside the # data window. For non-atomic writes the strobe is also applied # at commit time via self.write(strobe=...) for partial writes. wdata = self._mask_by_strobe(wdata, wstrb, self.nbytes) if hasattr(item, "awatop"): # For COMPARE: compare beats map 1:1 to R-channel slots; swap beats do not. # Use item.get_rlen()+1 so this covers both the unpacked form (AWLEN>=1, # half the W beats are compare beats) and the packed form per # AXI A6.2 (AWLEN=0, single beat carrying both compare and swap). _is_compare_rbeat_ = True if item.awatop in [axi_atomic_t.COMPARE]: _n_compare_ = item.get_rlen() + 1 _is_compare_rbeat_ = (i < _n_compare_) # Return value is always original read (only for R-beat slots) if _is_compare_rbeat_: item.set("rdata", self.read(a, num_bytes=num_bytes), idx=i) # Handle endianness if item.awatop.endianness() != self.endianness: self._endianness_swap_ = True # Perform atomic update if item.awatop == axi_atomic_t.NON_ATOMIC: self.write(a, wdata, num_bytes=num_bytes, strobe=item.get("wstrb", idx=i, default=None)) elif item.awatop in [axi_atomic_t.STORE_LE_ADD, axi_atomic_t.LOAD_LE_ADD, axi_atomic_t.STORE_BE_ADD, axi_atomic_t.LOAD_BE_ADD]: self.add(a, wdata, num_bytes=num_bytes) elif item.awatop in [axi_atomic_t.STORE_LE_CLR, axi_atomic_t.LOAD_LE_CLR, axi_atomic_t.STORE_BE_CLR, axi_atomic_t.LOAD_BE_CLR]: self.clr(a, wdata, num_bytes=num_bytes) elif item.awatop in [axi_atomic_t.STORE_LE_EOR, axi_atomic_t.LOAD_LE_EOR, axi_atomic_t.STORE_BE_EOR, axi_atomic_t.LOAD_BE_EOR]: self.xor(a, wdata, num_bytes=num_bytes) elif item.awatop in [axi_atomic_t.STORE_LE_SET, axi_atomic_t.LOAD_LE_SET, axi_atomic_t.STORE_BE_SET, axi_atomic_t.LOAD_BE_SET]: self.set(a, wdata, num_bytes=num_bytes) elif item.awatop in [axi_atomic_t.STORE_LE_SMAX, axi_atomic_t.LOAD_LE_SMAX, axi_atomic_t.STORE_BE_SMAX, axi_atomic_t.LOAD_BE_SMAX]: self.smax(a, wdata, num_bytes=num_bytes) elif item.awatop in [axi_atomic_t.STORE_LE_SMIN, axi_atomic_t.LOAD_LE_SMIN, axi_atomic_t.STORE_BE_SMIN, axi_atomic_t.LOAD_BE_SMIN]: self.smin(a, wdata, num_bytes=num_bytes) elif item.awatop in [axi_atomic_t.STORE_LE_UMAX, axi_atomic_t.LOAD_LE_UMAX, axi_atomic_t.STORE_BE_UMAX, axi_atomic_t.LOAD_BE_UMAX]: self.umax(a, wdata, num_bytes=num_bytes) elif item.awatop in [axi_atomic_t.STORE_LE_UMIN, axi_atomic_t.LOAD_LE_UMIN, axi_atomic_t.STORE_BE_UMIN, axi_atomic_t.LOAD_BE_UMIN]: self.umin(a, wdata, num_bytes=num_bytes) elif item.awatop in [axi_atomic_t.SWAP]: self.swap(a, wdata, num_bytes=num_bytes) elif item.awatop in [axi_atomic_t.COMPARE]: # Two supported forms per AXI A6.2 (INCR variants only here): # - Packed (AWLEN=0): single beat holds compare in the lower # half-size bytes and swap in the upper half-size bytes; # the compare/swap target is half the AXI size. # - Unpacked (AWLEN>=1): first _n_compare_ beats carry the # compare data, paired beat-for-beat with the swap data in # the second half. Each pair maps to a memory word of awsize bytes. if item.get("awlen", default=0) == 0: half_bytes = num_bytes // 2 half_mask = (1 << (half_bytes * 8)) - 1 compare_val = wdata & half_mask swap_val = (wdata >> (half_bytes * 8)) & half_mask self.compare(a, swap_val, compare_val, num_bytes=half_bytes) elif _is_compare_rbeat_: swap_idx = i + _n_compare_ swap_data = item.get("wdata", idx=swap_idx, default=0) swap_strb = item.get("wstrb", idx=swap_idx, default=None) swap_data = self._mask_by_strobe(swap_data, swap_strb, self.nbytes) self.compare(a, swap_data, wdata, num_bytes=num_bytes) # else: swap beats of the unpacked form are handled by the paired compare beat else: raise ValueError() # No Endiannes Swap by default self._endianness_swap_ = False else: # Standard Write self.write(a, item.get("wdata", idx=i, default=0), strobe=item.get("wstrb", idx=i, default=None)) else: item.set("bresp", axi_resp_t.DECERR)
[docs] def process_read(self, item : ReadItem) -> None: """ Process a sequence item update for memory :param item: The sequence item to process :type item: SequenceItem """ decerr = False if not isinstance(item, ReadItem): raise TypeError("Item must be a ReadItem") for i,a in enumerate(get_burst_addresses(item.get("araddr"), item.get("arlen", default=0), item.get("arsize", default=0), item.get("arburst", default=1) )): if self._check_address_(a): num_bytes = 1<<(item.get("arsize", default=0)) aligned_addr = a & ~(num_bytes-1) item.set("rdata", self.read(aligned_addr, num_bytes=num_bytes), idx=i) item.set("rresp", axi_resp_t.OKAY, idx=i) else: item.set("rdata", axi_resp_t.OKAY, idx=i) item.set("rresp", axi_resp_t.DECERR, idx=i) decerr = True # Consistent DECERR if item._Consistent_DECERR_ and decerr: for i in range(len(item.rresp)): item.set("rresp", axi_resp_t.DECERR, idx=i)
__all__ = ["SubordinateMemory"]