# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Subordinate Memory
import avl
from ._item import ReadItem, WriteItem
from ._types import axi_atomic_t, axi_resp_t
from ._utils import get_burst_addresses
[docs]
class SubordinateMemory(avl.Memory):
[docs]
def __init__(self, width : int = 32) -> None:
"""
Initialize the Memory for the AXI Subordinate Driver
:param name: Name of the agent instance
:type name: str
:param parent: Parent component
:type parent: Component
"""
super().__init__(width=width)
# Misses return DECERR not assert
self.miss = lambda address : None
# Endianness Swap
self._endianness_swap_ = False
[docs]
def read(self, address: int, num_bytes : int = None) -> int:
"""
Read a value from the memory at the specified address.
Calls miss() if the address is not found in memory.
:param address: Address to read from.
:type address: int
:return: Value at the specified address.
:rtype: int
"""
if self._endianness_swap_:
return self._convert_endianness_(super().read(address, num_bytes=num_bytes, rotated=True), num_bytes=num_bytes)
else:
return super().read(address, num_bytes=num_bytes, rotated=True)
def _mask_by_strobe(self, data: int, strobe, n_bytes: int) -> int:
"""Byte-wise apply WSTRB to WDATA: each cleared strobe bit zeroes the
corresponding data byte. Mirrors what a spec-compliant subordinate sees
on the wire: bytes with WSTRB=0 are not part of the write. With strobe
all-zero the returned value is 0.
"""
if strobe is None:
return int(data)
s = int(strobe)
d = int(data)
mask = 0
for j in range(int(n_bytes)):
if (s >> j) & 1:
mask |= 0xFF << (8 * j)
return d & mask
[docs]
def write(self, address: int, value: int, num_bytes : int = None, strobe : int = None) -> None:
"""
Write a value to the memory at the specified address.
Calls miss() if the address is not found in memory.
:param address: Address to write to.
:type address: int
:param value: Value to write.
:type value: int
:param num_bytes: Number of bytes to write (default is width // 8).
:type num_bytes: int, optional
:param strobe: Strobe signal
:type strobe: int, optional
"""
if self._endianness_swap_:
value = self._convert_endianness_(value, num_bytes=num_bytes)
# Write to memory
super().write(address, value, num_bytes=num_bytes, strobe=strobe, rotated=True)
def _convert_endianness_(self, value: int, num_bytes: int) -> int:
"""
Convert the endianness of an integer represented in nbytes.
:param value: Unsigned integer
:param width: Bit width (e.g., 8, 16, 32)
:return: Unsigned integer
"""
mask = (1 << (num_bytes * 8)) - 1
value &= mask
b = value.to_bytes(num_bytes, byteorder="little", signed=False)
return int.from_bytes(b[::-1], byteorder="little", signed=False)
def _unsigned_to_signed_(self, value: int, width: int) -> int:
"""
Convert an unsigned integer to a signed integer with given bit width.
:param value: Unsigned integer
:param width: Bit width (e.g., 8, 16, 32)
:return: Signed integer
"""
mask = 1 << (width - 1)
if value & mask:
return value - (1 << width)
return value
def _signed_to_unsigned_(self, value: int, width: int) -> int:
"""
Convert an signed integer to a unsigned integer with given bit width.
:param value: Unsigned integer
:param width: Bit width (e.g., 8, 16, 32)
:return: Signed integer
"""
mask = (1 << width) - 1
return value & mask
[docs]
def swap(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Swap values
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
self.write(address, value, num_bytes = num_bytes)
[docs]
def compare(self, address: int, value: int, compare: int, num_bytes : int = None) -> int:
"""
Compare Values
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
if old_value == compare:
self.write(address, value, num_bytes=num_bytes)
[docs]
def add(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Add Values
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, value+old_value, num_bytes = num_bytes)
[docs]
def clr(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Bitwise Clear Value
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, ~value&old_value, num_bytes = num_bytes)
[docs]
def xor(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Bitwise Exclusive Or Value
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, value^old_value, num_bytes = num_bytes)
[docs]
def set(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Bitwise OR Value
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, value|old_value, num_bytes = num_bytes)
[docs]
def smax(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Signed Max
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self._unsigned_to_signed_(self.read(address, num_bytes=num_bytes), 8*num_bytes)
value = self._unsigned_to_signed_(value, 8*num_bytes)
self.write(address, self._signed_to_unsigned_(max(value,old_value),8*num_bytes), num_bytes = num_bytes)
[docs]
def smin(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Unsigned Min
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self._unsigned_to_signed_(self.read(address, num_bytes=num_bytes), 8*num_bytes)
value = self._unsigned_to_signed_(value, 8*num_bytes)
self.write(address, self._signed_to_unsigned_(min(value,old_value), 8*num_bytes), num_bytes = num_bytes)
[docs]
def umax(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Unsigned Max
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, max(value,old_value), num_bytes = num_bytes)
[docs]
def umin(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Unsigned Min
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, min(value,old_value), num_bytes = num_bytes)
[docs]
def process_write(self, item : WriteItem) -> None:
"""
Process a sequence item update for memory update
:param item: The sequence item to process
:type item: SequenceItem
"""
if not isinstance(item, WriteItem):
raise TypeError("Item must be a WriteItem")
# Update memory contents
for i,a in enumerate(get_burst_addresses(item.get("awaddr"),
item.get("awlen", default=0),
item.get("awsize", default=0),
item.get("awburst", default=1)
)):
if self._check_address_(a):
num_bytes = 1<<(item.get("awsize", default=0))
wdata = item.get("wdata", idx=i, default=0)
wstrb = item.get("wstrb", idx=i, default=None)
# Honor WSTRB on the operand uniformly across all writes (atomic
# and non-atomic): bytes with WSTRB=0 are not part of the operand
# and are zeroed before the operation runs. For spec-compliant
# atomics A6.4.5 requires all strobes within the data window to
# be asserted, so this mask is a no-op for them; it still
# protects the operand against any unstrobed garbage outside the
# data window. For non-atomic writes the strobe is also applied
# at commit time via self.write(strobe=...) for partial writes.
wdata = self._mask_by_strobe(wdata, wstrb, self.nbytes)
if hasattr(item, "awatop"):
# For COMPARE: compare beats map 1:1 to R-channel slots; swap beats do not.
# Use item.get_rlen()+1 so this covers both the unpacked form (AWLEN>=1,
# half the W beats are compare beats) and the packed form per
# AXI A6.2 (AWLEN=0, single beat carrying both compare and swap).
_is_compare_rbeat_ = True
if item.awatop in [axi_atomic_t.COMPARE]:
_n_compare_ = item.get_rlen() + 1
_is_compare_rbeat_ = (i < _n_compare_)
# Return value is always original read (only for R-beat slots)
if _is_compare_rbeat_:
item.set("rdata", self.read(a, num_bytes=num_bytes), idx=i)
# Handle endianness
if item.awatop.endianness() != self.endianness:
self._endianness_swap_ = True
# Perform atomic update
if item.awatop == axi_atomic_t.NON_ATOMIC:
self.write(a, wdata, num_bytes=num_bytes, strobe=item.get("wstrb", idx=i, default=None))
elif item.awatop in [axi_atomic_t.STORE_LE_ADD, axi_atomic_t.LOAD_LE_ADD, axi_atomic_t.STORE_BE_ADD, axi_atomic_t.LOAD_BE_ADD]:
self.add(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_CLR, axi_atomic_t.LOAD_LE_CLR, axi_atomic_t.STORE_BE_CLR, axi_atomic_t.LOAD_BE_CLR]:
self.clr(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_EOR, axi_atomic_t.LOAD_LE_EOR, axi_atomic_t.STORE_BE_EOR, axi_atomic_t.LOAD_BE_EOR]:
self.xor(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_SET, axi_atomic_t.LOAD_LE_SET, axi_atomic_t.STORE_BE_SET, axi_atomic_t.LOAD_BE_SET]:
self.set(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_SMAX, axi_atomic_t.LOAD_LE_SMAX, axi_atomic_t.STORE_BE_SMAX, axi_atomic_t.LOAD_BE_SMAX]:
self.smax(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_SMIN, axi_atomic_t.LOAD_LE_SMIN, axi_atomic_t.STORE_BE_SMIN, axi_atomic_t.LOAD_BE_SMIN]:
self.smin(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_UMAX, axi_atomic_t.LOAD_LE_UMAX, axi_atomic_t.STORE_BE_UMAX, axi_atomic_t.LOAD_BE_UMAX]:
self.umax(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_UMIN, axi_atomic_t.LOAD_LE_UMIN, axi_atomic_t.STORE_BE_UMIN, axi_atomic_t.LOAD_BE_UMIN]:
self.umin(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.SWAP]:
self.swap(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.COMPARE]:
# Two supported forms per AXI A6.2 (INCR variants only here):
# - Packed (AWLEN=0): single beat holds compare in the lower
# half-size bytes and swap in the upper half-size bytes;
# the compare/swap target is half the AXI size.
# - Unpacked (AWLEN>=1): first _n_compare_ beats carry the
# compare data, paired beat-for-beat with the swap data in
# the second half. Each pair maps to a memory word of awsize bytes.
if item.get("awlen", default=0) == 0:
half_bytes = num_bytes // 2
half_mask = (1 << (half_bytes * 8)) - 1
compare_val = wdata & half_mask
swap_val = (wdata >> (half_bytes * 8)) & half_mask
self.compare(a, swap_val, compare_val, num_bytes=half_bytes)
elif _is_compare_rbeat_:
swap_idx = i + _n_compare_
swap_data = item.get("wdata", idx=swap_idx, default=0)
swap_strb = item.get("wstrb", idx=swap_idx, default=None)
swap_data = self._mask_by_strobe(swap_data, swap_strb, self.nbytes)
self.compare(a, swap_data, wdata, num_bytes=num_bytes)
# else: swap beats of the unpacked form are handled by the paired compare beat
else:
raise ValueError()
# No Endiannes Swap by default
self._endianness_swap_ = False
else:
# Standard Write
self.write(a, item.get("wdata", idx=i, default=0), strobe=item.get("wstrb", idx=i, default=None))
else:
item.set("bresp", axi_resp_t.DECERR)
[docs]
def process_read(self, item : ReadItem) -> None:
"""
Process a sequence item update for memory
:param item: The sequence item to process
:type item: SequenceItem
"""
decerr = False
if not isinstance(item, ReadItem):
raise TypeError("Item must be a ReadItem")
for i,a in enumerate(get_burst_addresses(item.get("araddr"),
item.get("arlen", default=0),
item.get("arsize", default=0),
item.get("arburst", default=1)
)):
if self._check_address_(a):
num_bytes = 1<<(item.get("arsize", default=0))
aligned_addr = a & ~(num_bytes-1)
item.set("rdata", self.read(aligned_addr, num_bytes=num_bytes), idx=i)
item.set("rresp", axi_resp_t.OKAY, idx=i)
else:
item.set("rdata", axi_resp_t.OKAY, idx=i)
item.set("rresp", axi_resp_t.DECERR, idx=i)
decerr = True
# Consistent DECERR
if item._Consistent_DECERR_ and decerr:
for i in range(len(item.rresp)):
item.set("rresp", axi_resp_t.DECERR, idx=i)
__all__ = ["SubordinateMemory"]