# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Driver
import random
import avl
import cocotb
from cocotb.triggers import RisingEdge
import random
from ._driver import Driver
from ._item import SequenceItem, WriteItem
from ._signals import aw_m_signals, aw_s_signals, b_s_signals, w_m_signals, w_s_signals
from ._utils import get_beat_byte_offset
[docs]
class SubordinateWriteDriver(Driver):
[docs]
def __init__(self, name: str, parent: avl.Component) -> None:
"""
Initialize the Subordinate Write Driver for the AXI agent.
:param name: Name of the agent instance
:type name: str
:param parent: Parent component
:type parent: Component
"""
super().__init__(name, parent)
self.in_order = avl.Factory.get_variable(f"{self.get_full_name}.in_order", True)
"""Responses return in order of control"""
self.qosaccept = avl.Factory.get_variable(f"{self.get_full_name}.qosaccept", 0)
"""QOS Accept hint value"""
# Subordinate Read Driver - required for atomic loads
self._srdrv_ = None
# Memory Mode
self.memory = None
# Items Queues
self.controlQ = avl.List()
self.dataQ = avl.List()
self.responseQ = avl.List()
# Exclusive Monitor
self.emonitor = None
# AXI A3.2.3 / A3.4.1 narrow-transfer byte-lane de-shift opt-in (via AgentCfg).
# Symmetric with ManagerWriteDriver: when enabled, the wdata/wstrb captured
# from the bus on each W beat is un-rotated by the per-beat byte offset so
# downstream consumers (e.g. SubordinateMemory) always see the operand at
# lane 0. When disabled the bus word is stored verbatim (legacy behaviour).
_cfg_ = avl.Factory.get_variable(f"{self.get_full_name()}.cfg", None)
self.narrow_transfer_lane_steering = bool(_cfg_.narrow_transfer_lane_steering) if _cfg_ is not None else False
[docs]
async def reset(self) -> None:
"""
Reset the driver by setting all signals to their default values.
This method is called when the driver is reset.
By default 0's all signals - can be overridden in subclasses to add randomization or other behavior.
"""
# Write Signals
for s in aw_s_signals + w_s_signals + b_s_signals:
self.i_f.set(s, 0)
# QOS Accept
self.i_f.set("vawqosaccept", self.qosaccept)
async def _wait_for_data_(self) -> None:
"""
Wait for the wdata to populate control items
This way the data phase can be ahead of control phase
"""
while True:
item = await self.controlQ.blocking_pop()
# AXI A3.2.3 / A3.4.1 narrow-transfer byte-lane de-shift opt-in:
# if enabled, compute the per-beat byte offset from the AW address
# and un-rotate wdata/wstrb so the operand always lands at lane 0
# in the item. Otherwise store the bus word verbatim (legacy).
if self.narrow_transfer_lane_steering:
awaddr = int(item.get("awaddr", default=0))
awsize = int(item.get("awsize", default=0))
awburst = int(item.get("awburst", default=1))
awlen = int(item.get("awlen", default=0))
for i in range(item.get_len()+1):
d = await self.dataQ.blocking_pop()
if self.narrow_transfer_lane_steering:
byte_offset = get_beat_byte_offset(
awaddr, i, awlen, awsize, awburst, self.i_f.STRB_WIDTH
)
else:
byte_offset = 0
for k,v in d.items():
if hasattr(item, k):
if byte_offset and k == "wdata":
v = int(v) >> (byte_offset * 8)
elif byte_offset and k == "wstrb":
v = int(v) >> byte_offset
item.set(k, v, idx=i)
# Handle Memory Access
if self.memory is not None:
self.memory.process_write(item)
# Send item to response phases
if item.has_bresp():
self.responseQ.append(item)
if item.has_rresp():
self._srdrv_.responseQ.append(item)
[docs]
async def quiesce_control(self) -> None:
"""
Quiesce the control signals by setting them to their default values.
This method is called after a control transaction is completed.
By default 0's all signals - can be overridden in subclasses to add randomization or other behavior.
"""
for s in aw_s_signals:
self.i_f.set(s, 0)
[docs]
async def drive_control(self) -> None:
"""
Drive the control signals by waiting for valid signals and then setting the ready signals.
This method runs in an infinite loop and should be started as a separate coroutine.
"""
self.controlQ.clear()
self.tasks.append(cocotb.start_soon(self._wait_for_data_()))
while True:
self.i_f.set("awready", 0)
while self.i_f.get("aresetn") == 0 or self.i_f.get("awakeup", default=1) == 0:
await RisingEdge(self.i_f.aclk)
item = WriteItem("from_sdriver", self)
# Rate Limiter
await self.wait_on_rate(self.control_rate_limit())
self.i_f.set("awready", 1)
while True:
await RisingEdge(self.i_f.aclk)
if self.i_f.get("awvalid"):
break
await self.quiesce_control()
# Populate Item with control data
for s in aw_m_signals:
item.set(s, self.i_f.get(s, default=0))
item.resize()
self.controlQ.append(item)
[docs]
async def quiesce_data(self) -> None:
"""
Quiesce the data signals by setting them to their default values.
This method is called after a data transaction is completed.
By default 0's all signals - can be overridden in subclasses to add randomization or other behavior.
"""
for s in w_s_signals:
self.i_f.set(s, 0)
[docs]
async def drive_data(self) -> None:
"""
Drive the data signals by waiting for valid signals and then setting the ready signals.
This method runs in an infinite loop and should be started as a separate coroutine.
"""
self.dataQ.clear()
while True:
self.i_f.set("wready", 0)
while self.i_f.get("aresetn") == 0 or self.i_f.get("awakeup", default=1) == 0:
await RisingEdge(self.i_f.aclk)
# Rate Limiter
await self.wait_on_rate(self.data_rate_limit())
self.i_f.set("wready", 1)
while True:
await RisingEdge(self.i_f.aclk)
if self.i_f.get("wvalid"):
break
data = {}
for s in w_m_signals:
data[s] = self.i_f.get(s, default=0)
self.dataQ.append(data)
await self.quiesce_data()
[docs]
async def quiesce_response(self) -> None:
"""
Quiesce the response signals by setting them to their default values.
This method is called after a response transaction is completed.
By default 0's all signals - can be overridden in subclasses to add randomization or other behavior.
"""
for s in b_s_signals:
if s != "bpending":
self.i_f.set(s, 0)
[docs]
async def drive_response(self) -> None:
"""
Drive the response signals by waiting for valid signals and then setting the ready signals.
This method runs in an infinite loop and should be started as a separate coroutine.
"""
self.responseQ.clear()
while True:
while not self.responseQ or self.i_f.get("arestn") == 0 or self.i_f.get("awakeup", default=1) == 0:
await RisingEdge(self.i_f.aclk)
if self.in_order or self.i_f.Ordered_Write_Observation:
idx = 0
else:
idx = random.randrange(len(self.responseQ))
item = await self.get_next_item(self.responseQ.pop(idx))
# Exclusive monitor
self.emonitor.process_write(item)
# Credit Control
await self.wait_on_credit("b", [0])
# Rate Limiter
await self.wait_on_rate(self.response_rate_limit())
# Pending
if not bool(self.i_f.get("bpending", default=1)):
self.i_f.set("bpending", 1)
await RisingEdge(self.i_f.aclk)
for s in b_s_signals:
if s in ["bvalid"]:
self.i_f.set(s, 1)
elif s == "bpending":
if random.random() > self.pending_rate_limit():
self.i_f.set(s, 0)
else:
self.i_f.set(s, item.get(s, default=0))
while True:
await RisingEdge(self.i_f.aclk)
if self.i_f.get("bready", default=1):
break
await self. quiesce_response()
[docs]
async def drive_credits(self) -> None:
"""
Drive credits
"""
if self.i_f.AXI_Transport != "Credited":
return
while True:
await RisingEdge(self.i_f.aclk)
if self.i_f.get("aresetn") == 0:
continue
awcrdt = 0
awcrdtsh = 0
wcrdt = 0
wcrdtsh = 0
for i in range(self.i_f.Num_RP_AWW):
if int(self.i_f.get("awcredits", idx=i, default=0)) < self.i_f.NUM_CREDITS and random.random() <= self.credit_rate_limit():
awcrdt |= 1 << i
if int(self.i_f.get("wcredits", idx=i, default=0)) < self.i_f.NUM_CREDITS and random.random() <= self.credit_rate_limit():
wcrdt |= 1 << i
if bool(self.i_f.Shared_Credits_AW) and int(self.i_f.get("awcredits", idx=self.i_f.Num_RP_AWW, default=0)) < self.i_f.NUM_SHARED_CREDITS and random.random() <= self.credit_rate_limit():
awcrdtsh |= 1
if bool(self.i_f.Shared_Credits_W) and int(self.i_f.get("wcredits", idx=self.i_f.Num_RP_AWW, default=0)) < self.i_f.NUM_SHARED_CREDITS and random.random() <= self.credit_rate_limit():
wcrdtsh |= 1
self.i_f.set("awcrdt", awcrdt)
self.i_f.set("wcrdt", wcrdt)
self.i_f.set("awcrdtsh", awcrdtsh)
self.i_f.set("wcrdtsh", wcrdtsh)
[docs]
async def get_next_item(self, item : SequenceItem = None) -> SequenceItem:
"""
Get the next item to be processed.
This method can be overridden in subclasses to modify the item before it is processed.
:param item: The item to be processed
:type item: SequenceItem
:return: The item to be processed
:rtype: SequenceItem
"""
if not isinstance(item, WriteItem):
raise ValueError("get_next_item() - expected type WriteItem")
return item
[docs]
class SubordinateWriteMemoryDriver(SubordinateWriteDriver):
[docs]
def __init__(self, name: str, parent: avl.Component) -> None:
"""
Initialize the Subordinate Write Driver for the AXI agent.
This agent behaves as a memory
Memory operations are performed at the end of the aw/w phase
as responses can be delayed
:param name: Name of the agent instance
:type name: str
:param parent: Parent component
:type parent: Component
"""
super().__init__(name, parent)
# Assign the memory
self.memory = parent.smem
__all__ = ["SubordinateWriteDriver", "SubordinateWriteMemoryDriver"]