Source code for avl_axi._swdriver

# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Driver

import random

import avl
import cocotb
from cocotb.triggers import RisingEdge

from ._driver import Driver
from ._item import SequenceItem, WriteItem
from ._signals import aw_m_signals, aw_s_signals, b_s_signals, w_m_signals, w_s_signals


[docs] class SubordinateWriteDriver(Driver):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the Subordinate Write Driver for the AMBA agent. :param name: Name of the agent instance :type name: str :param parent: Parent component :type parent: Component """ super().__init__(name, parent) self.in_order = avl.Factory.get_variable(f"{self.get_full_name}.in_order", True) """Responses return in order of control""" self.qosaccept = avl.Factory.get_variable(f"{self.get_full_name}.qosaccept", 0) """QOS Accept hint value""" # Subordinate Read Driver - required for atomic loads self._srdrv_ = None # Memory Mode self.memory = None # Items Queues self.controlQ = avl.List() self.dataQ = avl.List() self.responseQ = avl.List() # Exclusive Monitor self.emonitor = None
[docs] async def reset(self) -> None: """ Reset the driver by setting all signals to their default values. This method is called when the driver is reset. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ # Write Signals for s in aw_s_signals + w_s_signals + b_s_signals: self.i_f.set(s, 0) # QOS Accept self.i_f.set("vawqosaccept", self.qosaccept)
async def _wait_for_data_(self) -> None: """ Wait for the wdata to populate control items This way the data phase can be ahead of control phase """ while True: item = await self.controlQ.blocking_pop() for i in range(item.get_len()+1): d = await self.dataQ.blocking_pop() for k,v in d.items(): if hasattr(item, k): item.set(k, v, idx=i) # Handle Memory Access if self.memory is not None: self.memory.process_write(item) # Send item to response phases if item.has_bresp(): self.responseQ.append(item) if item.has_rresp(): self._srdrv_.responseQ.append(item)
[docs] async def quiesce_control(self) -> None: """ Quiesce the control signals by setting them to their default values. This method is called after a control transaction is completed. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ for s in aw_s_signals: self.i_f.set(s, 0)
[docs] async def drive_control(self) -> None: """ Drive the control signals by waiting for valid signals and then setting the ready signals. This method runs in an infinite loop and should be started as a separate coroutine. """ self.controlQ.clear() self.tasks.append(cocotb.start_soon(self._wait_for_data_())) while True: self.i_f.set("awready", 0) while self.i_f.get("aresetn") == 0 or self.i_f.get("awakeup", default=1) == 0: await RisingEdge(self.i_f.aclk) item = WriteItem("from_sdriver", self) # Rate Limiter await self.wait_on_rate(self.control_rate_limit()) self.i_f.set("awready", 1) while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("awvalid"): break await self.quiesce_control() # Populate Item with control data for s in aw_m_signals: item.set(s, self.i_f.get(s, default=0)) item.resize(finalize=True) self.controlQ.append(item)
[docs] async def quiesce_data(self) -> None: """ Quiesce the data signals by setting them to their default values. This method is called after a data transaction is completed. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ for s in w_s_signals: self.i_f.set(s, 0)
[docs] async def drive_data(self) -> None: """ Drive the data signals by waiting for valid signals and then setting the ready signals. This method runs in an infinite loop and should be started as a separate coroutine. """ self.dataQ.clear() while True: self.i_f.set("wready", 0) while self.i_f.get("aresetn") == 0 or self.i_f.get("awakeup", default=1) == 0: await RisingEdge(self.i_f.aclk) # Rate Limiter await self.wait_on_rate(self.control_rate_limit()) self.i_f.set("wready", 1) while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("wvalid"): break data = {} for s in w_m_signals: data[s] = self.i_f.get(s, default=0) self.dataQ.append(data) await self.quiesce_data()
[docs] async def quiesce_response(self) -> None: """ Quiesce the response signals by setting them to their default values. This method is called after a response transaction is completed. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ for s in b_s_signals: self.i_f.set(s, 0)
[docs] async def drive_response(self) -> None: """ Drive the response signals by waiting for valid signals and then setting the ready signals. This method runs in an infinite loop and should be started as a separate coroutine. """ self.responseQ.clear() while True: while not self.responseQ or self.i_f.get("arestn") == 0 or self.i_f.get("awakeup", default=1) == 0: await RisingEdge(self.i_f.aclk) if self.in_order or self.i_f.Ordered_Write_Observation: idx = 0 else: idx = random.randrange(len(self.responseQ)) item = await self.get_next_item(self.responseQ.pop(idx)) # Exclusive monitor self.emonitor.process_write(item) # Rate Limiter await self.wait_on_rate(self.response_rate_limit()) for s in b_s_signals: if s in ["bvalid"]: self.i_f.set(s, 1) else: self.i_f.set(s, item.get(s, default=0)) while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("bready"): break await self. quiesce_response()
[docs] async def get_next_item(self, item : SequenceItem = None) -> SequenceItem: """ Get the next item to be processed. This method can be overridden in subclasses to modify the item before it is processed. :param item: The item to be processed :type item: SequenceItem :return: The item to be processed :rtype: SequenceItem """ if not isinstance(item, WriteItem): raise ValueError("get_next_item() - expected type WriteItem") return item
[docs] class SubordinateWriteMemoryDriver(SubordinateWriteDriver):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the Subordinate Write Driver for the AXI agent. This agent behaves as a memory Memory operations are performed at the end of the aw/w phase as responses can be delayed :param name: Name of the agent instance :type name: str :param parent: Parent component :type parent: Component """ super().__init__(name, parent) # Assign the memory self.memory = parent.smem
__all__ = ["SubordinateWriteDriver", "SubordinateWriteMemoryDriver"]