# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Subordinate Memory
import avl
from ._item import ReadItem, WriteItem
from ._types import axi_atomic_t, axi_resp_t
from ._utils import get_burst_addresses
[docs]
class SubordinateMemory(avl.Memory):
[docs]
def __init__(self, width : int = 32) -> None:
"""
Initialize the Memory for the AXI Subordinate Driver
:param name: Name of the agent instance
:type name: str
:param parent: Parent component
:type parent: Component
"""
super().__init__(width=width)
# Misses return DECERR not assert
self.miss = lambda address : None
# Endianness Swap
self._endianness_swap_ = False
[docs]
def read(self, address: int, num_bytes : int = None) -> int:
"""
Read a value from the memory at the specified address.
Calls miss() if the address is not found in memory.
:param address: Address to read from.
:type address: int
:return: Value at the specified address.
:rtype: int
"""
if self._endianness_swap_:
return self._convert_endianness_(super().read(address, num_bytes=num_bytes, rotated=True), num_bytes=num_bytes)
else:
return super().read(address, num_bytes=num_bytes, rotated=True)
[docs]
def write(self, address: int, value: int, num_bytes : int = None, strobe : int = None) -> None:
"""
Write a value to the memory at the specified address.
Calls miss() if the address is not found in memory.
:param address: Address to write to.
:type address: int
:param value: Value to write.
:type value: int
:param num_bytes: Number of bytes to write (default is width // 8).
:type num_bytes: int, optional
:param strobe: Strobe signal
:type strobe: int, optional
"""
if self._endianness_swap_:
value = self._convert_endianness_(value, num_bytes=num_bytes)
# Write to memory
super().write(address, value, num_bytes=num_bytes, strobe=strobe, rotated=True)
def _convert_endianness_(self, value: int, num_bytes: int) -> int:
"""
Convert the endianness of an integer represented in nbytes.
:param value: Unsigned integer
:param width: Bit width (e.g., 8, 16, 32)
:return: Unsigned integer
"""
mask = (1 << (num_bytes * 8)) - 1
value &= mask
b = value.to_bytes(num_bytes, byteorder="little", signed=False)
return int.from_bytes(b[::-1], byteorder="little", signed=False)
def _unsigned_to_signed_(self, value: int, width: int) -> int:
"""
Convert an unsigned integer to a signed integer with given bit width.
:param value: Unsigned integer
:param width: Bit width (e.g., 8, 16, 32)
:return: Signed integer
"""
mask = 1 << (width - 1)
if value & mask:
return value - (1 << width)
return value
def _signed_to_unsigned_(self, value: int, width: int) -> int:
"""
Convert an signed integer to a unsigned integer with given bit width.
:param value: Unsigned integer
:param width: Bit width (e.g., 8, 16, 32)
:return: Signed integer
"""
mask = (1 << width) - 1
return value & mask
[docs]
def swap(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Swap values
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
self.write(address, value, num_bytes = num_bytes)
[docs]
def compare(self, address: int, value: int, compare: int, num_bytes : int = None) -> int:
"""
Compare Values
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes//2)
if old_value == compare:
self.write(address, value, num_bytes=num_bytes//2)
[docs]
def add(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Add Values
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, value+old_value, num_bytes = num_bytes)
[docs]
def clr(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Bitwise Clear Value
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, ~value&old_value, num_bytes = num_bytes)
[docs]
def xor(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Bitwise Exclusive Or Value
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, value^old_value, num_bytes = num_bytes)
[docs]
def set(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Bitwise OR Value
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, value|old_value, num_bytes = num_bytes)
[docs]
def smax(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Signed Max
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self._unsigned_to_signed_(self.read(address, num_bytes=num_bytes), 8*num_bytes)
value = self._unsigned_to_signed_(value, 8*num_bytes)
self.write(address, self._signed_to_unsigned_(max(value,old_value),8*num_bytes), num_bytes = num_bytes)
[docs]
def smin(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Unsigned Min
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self._unsigned_to_signed_(self.read(address, num_bytes=num_bytes), 8*num_bytes)
value = self._unsigned_to_signed_(value, 8*num_bytes)
self.write(address, self._signed_to_unsigned_(min(value,old_value), 8*num_bytes), num_bytes = num_bytes)
[docs]
def umax(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Unsigned Max
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, max(value,old_value), num_bytes = num_bytes)
[docs]
def umin(self, address: int, value: int, num_bytes : int = None) -> int:
"""
Unsigned Min
:param address: Address
:type address: int
:param value: Value to apply
:type value: int
:return: Original value at the specified address.
:rtype: int
"""
old_value = self.read(address, num_bytes=num_bytes)
self.write(address, min(value,old_value), num_bytes = num_bytes)
[docs]
def process_write(self, item : WriteItem) -> None:
"""
Process a sequence item update for memory update
:param item: The sequence item to process
:type item: SequenceItem
"""
if not isinstance(item, WriteItem):
raise TypeError("Item must be a WriteItem")
# Update memory contents
for i,a in enumerate(get_burst_addresses(item.get("awaddr"),
item.get("awlen", default=0),
item.get("awsize", default=0),
item.get("awburst", default=1)
)):
if self._check_address_(a):
num_bytes = 2**(item.get("awsize", default=0))
wdata = item.get("wdata", idx=i, default=0)
if hasattr(item, "awatop"):
# Return value is always original read
item.set("rdata", self.read(a, num_bytes=num_bytes), idx=i)
# Handle endianness
if item.awatop.endianness() != self.endianness:
self._endianness_swap_ = True
# Perform atomic update
if item.awatop == axi_atomic_t.NON_ATOMIC:
self.write(a, wdata, num_bytes=num_bytes, strobe=item.get("wstrb", idx=i, default=None))
elif item.awatop in [axi_atomic_t.STORE_LE_ADD, axi_atomic_t.LOAD_LE_ADD, axi_atomic_t.STORE_BE_ADD, axi_atomic_t.LOAD_BE_ADD]:
self.add(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_CLR, axi_atomic_t.LOAD_LE_CLR, axi_atomic_t.STORE_BE_CLR, axi_atomic_t.LOAD_BE_CLR]:
self.clr(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_EOR, axi_atomic_t.LOAD_LE_EOR, axi_atomic_t.STORE_BE_EOR, axi_atomic_t.LOAD_BE_EOR]:
self.xor(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_SET, axi_atomic_t.LOAD_LE_SET, axi_atomic_t.STORE_BE_SET, axi_atomic_t.LOAD_BE_SET]:
self.set(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_SMAX, axi_atomic_t.LOAD_LE_SMAX, axi_atomic_t.STORE_BE_SMAX, axi_atomic_t.LOAD_BE_SMAX]:
self.smax(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_SMIN, axi_atomic_t.LOAD_LE_SMIN, axi_atomic_t.STORE_BE_SMIN, axi_atomic_t.LOAD_BE_SMIN]:
self.smin(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_UMAX, axi_atomic_t.LOAD_LE_UMAX, axi_atomic_t.STORE_BE_UMAX, axi_atomic_t.LOAD_BE_UMAX]:
self.umax(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.STORE_LE_UMIN, axi_atomic_t.LOAD_LE_UMIN, axi_atomic_t.STORE_BE_UMIN, axi_atomic_t.LOAD_BE_UMIN]:
self.umin(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.SWAP]:
self.swap(a, wdata, num_bytes=num_bytes)
elif item.awatop in [axi_atomic_t.COMPARE]:
comp = wdata
comp &= (1 << 8*num_bytes//2)-1
swap = wdata >> (8*num_bytes//2)
swap &= (1 << 8*num_bytes//2)-1
self.compare(a, swap, comp, num_bytes=num_bytes)
else:
raise ValueError()
# No Endiannes Swap by default
self._endianness_swap_ = False
else:
# Standard Write
self.write(a, item.get("wdata", idx=i, default=0), strobe=item.get("wstrb", idx=i, default=None))
else:
item.set("bresp", axi_resp_t.DECERR)
[docs]
def process_read(self, item : ReadItem) -> None:
"""
Process a sequence item update for memory
:param item: The sequence item to process
:type item: SequenceItem
"""
decerr = False
if not isinstance(item, ReadItem):
raise TypeError("Item must be a ReadItem")
for i,a in enumerate(get_burst_addresses(item.get("araddr"),
item.get("arlen", default=0),
item.get("arsize", default=0),
item.get("arburst", default=1)
)):
if self._check_address_(a):
num_bytes = 2**(item.get("arsize", default=0))
item.set("rdata", self.read(a, num_bytes=num_bytes), idx=i)
item.set("rresp", axi_resp_t.OKAY, idx=i)
else:
item.set("rdata", axi_resp_t.OKAY, idx=i)
item.set("rresp", axi_resp_t.DECERR, idx=i)
decerr = True
# Consistent DECERR
if item._Consistent_DECERR_ and decerr:
for i in range(len(item.rresp)):
item.set("rresp", axi_resp_t.DECERR, idx=i)
__all__ = ["SubordinateMemory"]