Source code for avl_axi._swdriver

# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Driver

import random

import avl
import cocotb
from cocotb.triggers import RisingEdge
import random

from ._driver import Driver
from ._item import SequenceItem, WriteItem
from ._signals import aw_m_signals, aw_s_signals, b_s_signals, w_m_signals, w_s_signals
from ._utils import get_beat_byte_offset


[docs] class SubordinateWriteDriver(Driver):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the Subordinate Write Driver for the AXI agent. :param name: Name of the agent instance :type name: str :param parent: Parent component :type parent: Component """ super().__init__(name, parent) self.in_order = avl.Factory.get_variable(f"{self.get_full_name}.in_order", True) """Responses return in order of control""" self.qosaccept = avl.Factory.get_variable(f"{self.get_full_name}.qosaccept", 0) """QOS Accept hint value""" # Subordinate Read Driver - required for atomic loads self._srdrv_ = None # Memory Mode self.memory = None # Items Queues self.controlQ = avl.List() self.dataQ = avl.List() self.responseQ = avl.List() # Exclusive Monitor self.emonitor = None # AXI A3.2.3 / A3.4.1 narrow-transfer byte-lane de-shift opt-in (via AgentCfg). # Symmetric with ManagerWriteDriver: when enabled, the wdata/wstrb captured # from the bus on each W beat is un-rotated by the per-beat byte offset so # downstream consumers (e.g. SubordinateMemory) always see the operand at # lane 0. When disabled the bus word is stored verbatim (legacy behaviour). _cfg_ = avl.Factory.get_variable(f"{self.get_full_name()}.cfg", None) self.narrow_transfer_lane_steering = bool(_cfg_.narrow_transfer_lane_steering) if _cfg_ is not None else False
[docs] async def reset(self) -> None: """ Reset the driver by setting all signals to their default values. This method is called when the driver is reset. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ # Write Signals for s in aw_s_signals + w_s_signals + b_s_signals: self.i_f.set(s, 0) # QOS Accept self.i_f.set("vawqosaccept", self.qosaccept)
async def _wait_for_data_(self) -> None: """ Wait for the wdata to populate control items This way the data phase can be ahead of control phase """ while True: item = await self.controlQ.blocking_pop() # AXI A3.2.3 / A3.4.1 narrow-transfer byte-lane de-shift opt-in: # if enabled, compute the per-beat byte offset from the AW address # and un-rotate wdata/wstrb so the operand always lands at lane 0 # in the item. Otherwise store the bus word verbatim (legacy). if self.narrow_transfer_lane_steering: awaddr = int(item.get("awaddr", default=0)) awsize = int(item.get("awsize", default=0)) awburst = int(item.get("awburst", default=1)) awlen = int(item.get("awlen", default=0)) for i in range(item.get_len()+1): d = await self.dataQ.blocking_pop() if self.narrow_transfer_lane_steering: byte_offset = get_beat_byte_offset( awaddr, i, awlen, awsize, awburst, self.i_f.STRB_WIDTH ) else: byte_offset = 0 for k,v in d.items(): if hasattr(item, k): if byte_offset and k == "wdata": v = int(v) >> (byte_offset * 8) elif byte_offset and k == "wstrb": v = int(v) >> byte_offset item.set(k, v, idx=i) # Handle Memory Access if self.memory is not None: self.memory.process_write(item) # Send item to response phases if item.has_bresp(): self.responseQ.append(item) if item.has_rresp(): self._srdrv_.responseQ.append(item)
[docs] async def quiesce_control(self) -> None: """ Quiesce the control signals by setting them to their default values. This method is called after a control transaction is completed. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ for s in aw_s_signals: self.i_f.set(s, 0)
[docs] async def drive_control(self) -> None: """ Drive the control signals by waiting for valid signals and then setting the ready signals. This method runs in an infinite loop and should be started as a separate coroutine. """ self.controlQ.clear() self.tasks.append(cocotb.start_soon(self._wait_for_data_())) while True: self.i_f.set("awready", 0) while self.i_f.get("aresetn") == 0 or self.i_f.get("awakeup", default=1) == 0: await RisingEdge(self.i_f.aclk) item = WriteItem("from_sdriver", self) # Rate Limiter await self.wait_on_rate(self.control_rate_limit()) self.i_f.set("awready", 1) while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("awvalid"): break await self.quiesce_control() # Populate Item with control data for s in aw_m_signals: item.set(s, self.i_f.get(s, default=0)) item.resize() self.controlQ.append(item)
[docs] async def quiesce_data(self) -> None: """ Quiesce the data signals by setting them to their default values. This method is called after a data transaction is completed. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ for s in w_s_signals: self.i_f.set(s, 0)
[docs] async def drive_data(self) -> None: """ Drive the data signals by waiting for valid signals and then setting the ready signals. This method runs in an infinite loop and should be started as a separate coroutine. """ self.dataQ.clear() while True: self.i_f.set("wready", 0) while self.i_f.get("aresetn") == 0 or self.i_f.get("awakeup", default=1) == 0: await RisingEdge(self.i_f.aclk) # Rate Limiter await self.wait_on_rate(self.data_rate_limit()) self.i_f.set("wready", 1) while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("wvalid"): break data = {} for s in w_m_signals: data[s] = self.i_f.get(s, default=0) self.dataQ.append(data) await self.quiesce_data()
[docs] async def quiesce_response(self) -> None: """ Quiesce the response signals by setting them to their default values. This method is called after a response transaction is completed. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ for s in b_s_signals: if s != "bpending": self.i_f.set(s, 0)
[docs] async def drive_response(self) -> None: """ Drive the response signals by waiting for valid signals and then setting the ready signals. This method runs in an infinite loop and should be started as a separate coroutine. """ self.responseQ.clear() while True: while not self.responseQ or self.i_f.get("arestn") == 0 or self.i_f.get("awakeup", default=1) == 0: await RisingEdge(self.i_f.aclk) if self.in_order or self.i_f.Ordered_Write_Observation: idx = 0 else: idx = random.randrange(len(self.responseQ)) item = await self.get_next_item(self.responseQ.pop(idx)) # Exclusive monitor self.emonitor.process_write(item) # Credit Control await self.wait_on_credit("b", [0]) # Rate Limiter await self.wait_on_rate(self.response_rate_limit()) # Pending if not bool(self.i_f.get("bpending", default=1)): self.i_f.set("bpending", 1) await RisingEdge(self.i_f.aclk) for s in b_s_signals: if s in ["bvalid"]: self.i_f.set(s, 1) elif s == "bpending": if random.random() > self.pending_rate_limit(): self.i_f.set(s, 0) else: self.i_f.set(s, item.get(s, default=0)) while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("bready", default=1): break await self. quiesce_response()
[docs] async def drive_credits(self) -> None: """ Drive credits """ if self.i_f.AXI_Transport != "Credited": return while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("aresetn") == 0: continue awcrdt = 0 awcrdtsh = 0 wcrdt = 0 wcrdtsh = 0 for i in range(self.i_f.Num_RP_AWW): if int(self.i_f.get("awcredits", idx=i, default=0)) < self.i_f.NUM_CREDITS and random.random() <= self.credit_rate_limit(): awcrdt |= 1 << i if int(self.i_f.get("wcredits", idx=i, default=0)) < self.i_f.NUM_CREDITS and random.random() <= self.credit_rate_limit(): wcrdt |= 1 << i if bool(self.i_f.Shared_Credits_AW) and int(self.i_f.get("awcredits", idx=self.i_f.Num_RP_AWW, default=0)) < self.i_f.NUM_SHARED_CREDITS and random.random() <= self.credit_rate_limit(): awcrdtsh |= 1 if bool(self.i_f.Shared_Credits_W) and int(self.i_f.get("wcredits", idx=self.i_f.Num_RP_AWW, default=0)) < self.i_f.NUM_SHARED_CREDITS and random.random() <= self.credit_rate_limit(): wcrdtsh |= 1 self.i_f.set("awcrdt", awcrdt) self.i_f.set("wcrdt", wcrdt) self.i_f.set("awcrdtsh", awcrdtsh) self.i_f.set("wcrdtsh", wcrdtsh)
[docs] async def get_next_item(self, item : SequenceItem = None) -> SequenceItem: """ Get the next item to be processed. This method can be overridden in subclasses to modify the item before it is processed. :param item: The item to be processed :type item: SequenceItem :return: The item to be processed :rtype: SequenceItem """ if not isinstance(item, WriteItem): raise ValueError("get_next_item() - expected type WriteItem") return item
[docs] class SubordinateWriteMemoryDriver(SubordinateWriteDriver):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the Subordinate Write Driver for the AXI agent. This agent behaves as a memory Memory operations are performed at the end of the aw/w phase as responses can be delayed :param name: Name of the agent instance :type name: str :param parent: Parent component :type parent: Component """ super().__init__(name, parent) # Assign the memory self.memory = parent.smem
__all__ = ["SubordinateWriteDriver", "SubordinateWriteMemoryDriver"]