Source code for avl_axi._srdriver

# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Driver

import random

import avl
from cocotb.triggers import RisingEdge
import random

from ._driver import Driver
from ._item import ReadItem, SequenceItem
from ._signals import ar_m_signals, ar_s_signals, r_s_signals


[docs] class SubordinateReadDriver(Driver):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the Subordinate Read Driver for the AMBA agent. :param name: Name of the agent instance :type name: str :param parent: Parent component :type parent: Component """ super().__init__(name, parent) self.in_order = avl.Factory.get_variable(f"{self.get_full_name}.in_order", True) """Responses return in order of control""" self.qosaccept = avl.Factory.get_variable(f"{self.get_full_name}.qosaccept", 0) """QOS Accept hint value""" # Subordinate Write Driver self._swdrv_ = None # Memory Mode self.memory = None # Items Queues self.responseQ = avl.List() # Exclusive Monitor self.emonitor = None
[docs] async def reset(self) -> None: """ Reset the driver by setting all signals to their default values. This method is called when the driver is reset. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ # Write Signals for s in ar_s_signals + r_s_signals: self.i_f.set(s, 0) # QOS Accept self.i_f.set("varqosaccept", self.qosaccept)
[docs] async def quiesce_control(self) -> None: """ Quiesce the control channel by setting all control signals to their default values. This method is called after a control transaction is completed. By default 0's all control signals - can be overridden in subclasses to add randomization or other behavior. """ for s in ar_s_signals: self.i_f.set(s, 0)
[docs] async def drive_control(self) -> None: """ Drive the control channel by sending read address transactions. This method runs in a loop, waiting for the appropriate conditions to send transactions. """ while True: self.i_f.set("arready", 0) while self.i_f.get("aresetn") == 0 or self.i_f.get("awakeup", default=1) == 0: await RisingEdge(self.i_f.aclk) item = ReadItem("from_sdriver", self) # Rate Limiter await self.wait_on_rate(self.control_rate_limit()) self.i_f.set("arready", 1) while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("arvalid"): break await self.quiesce_control() # Send item to response phase for s in ar_m_signals: item.set(s, self.i_f.get(s, default=0)) item.resize() # Handle Memory Access if self.memory is not None: self.memory.process_read(item) # Send item to write response phase self.responseQ.append(item)
[docs] async def quiesce_data(self) -> None: """ Quiesce the data channel by setting all data signals to their default values. This method is empty as reads have no data channel. """ pass
[docs] async def drive_data(self) -> None: """ Drive the data channel by sending read data transactions. This method is empty as reads have no data channel. """ pass
[docs] async def quiesce_response(self) -> None: """ Quiesce the response channel by setting all response signals to their default values. This method is called after a response transaction is completed. By default 0's all response signals - can be overridden in subclasses to add randomization or other behavior. """ for s in r_s_signals: if s != "rpending": self.i_f.set(s, 0)
[docs] async def drive_response(self) -> None: """ Drive the response channel by sending read data transactions. This method runs in a loop, waiting for the appropriate conditions to send transactions. """ self.responseQ.clear() while True: while not self.responseQ or self.i_f.get("aresetn") == 0 or self.i_f.get("awakeup", default=1) == 0: await RisingEdge(self.i_f.aclk) if self.in_order or self.i_f.Read_Interleaving_Disabled: idx = 0 else: idx = random.randrange(len(self.responseQ)) item = await self.get_next_item(self.responseQ[idx]) # Credit Control await self.wait_on_credit("r", [0]) # Rate Limiter await self.wait_on_rate(self.response_rate_limit()) # Pending if not bool(self.i_f.get("rpending", default=1)): self.i_f.set("rpending", 1) await RisingEdge(self.i_f.aclk) for s in r_s_signals: if s == "rvalid": self.i_f.set(s, 1) elif s == "rlast": self.i_f.set(s, item._rcnt_ == item.get_len()) elif s == "rpending": if random.random() > self.pending_rate_limit(): self.i_f.set(s, 0) else: self.i_f.set(s, item.get(s, idx=item._rcnt_, default=0)) while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("rready", default=1): break # Exclusive monitor self.emonitor.process_read(item) item._rcnt_ += 1 if item._rcnt_ == item.get_len()+1: delattr(item, "_rcnt_") self.responseQ.pop(idx) await self. quiesce_response()
[docs] async def drive_credits(self) -> None: """ Drive credits """ if self.i_f.AXI_Transport != "Credited": return while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("aresetn") == 0: continue arcrdt = 0 arcrdtsh = 0 for i in range(self.i_f.Num_RP_AR): if int(self.i_f.get("arcredits", idx=i, default=0)) < self.i_f.NUM_CREDITS and random.random() <= self.credit_rate_limit(): arcrdt |= 1 << i if bool(self.i_f.Shared_Credits_AR) and int(self.i_f.get("arcredits", idx=self.i_f.Num_RP_AR, default=0)) < self.i_f.NUM_SHARED_CREDITS and random.random() <= self.credit_rate_limit(): arcrdtsh |= 1 self.i_f.set("arcrdt", arcrdt) self.i_f.set("arcrdtsh", arcrdtsh)
[docs] async def get_next_item(self, item : SequenceItem = None) -> SequenceItem: """ Get the next item to be sent on the response channel. This method can be overridden in subclasses to modify the item before it is sent. By default, it initializes the read count if not already set. :param item: The item to be sent on the response channel :type item: SequenceItem :return: The item to be sent on the response channel :rtype: SequenceItem """ if not isinstance(item, SequenceItem): raise ValueError("get_next_item() - expected type SequenceItem") if not hasattr(item, "_rcnt_"): item._rcnt_ = 0 return item
[docs] class SubordinateReadRandomDriver(SubordinateReadDriver):
[docs] async def get_next_item(self, item : SequenceItem = None) -> SequenceItem: """ Get the next item to be sent on the response channel. This method randomizes the item if it has not been sent before. :param item: The item to be sent on the response channel :type item: SequenceItem :return: The item to be sent on the response channel :rtype: SequenceItem """ if not isinstance(item, SequenceItem): raise ValueError("get_next_item() - expected type SequenceItem") if not hasattr(item, "_rcnt_"): item._rcnt_ = 0 # Randomize for s in ar_m_signals: if hasattr(item, s): v = getattr(item, s) item.add_constraint(f"_c_{s}_temp", lambda x,val=v.value: x == val, v) item.randomize() return item
[docs] class SubordinateReadMemoryDriver(SubordinateReadDriver):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the Subordinate Write Driver for the AXI agent. This agent behaves as a memory Memory operations are performed at the end of the ar phase as responses can be delayed :param name: Name of the agent instance :type name: str :param parent: Parent component :type parent: Component """ super().__init__(name, parent) self.memory = parent.smem
__all__ = ["SubordinateReadDriver", "SubordinateReadRandomDriver", "SubordinateReadMemoryDriver"]