Source code for avl_axi._rmonitor

# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Monitor

import asyncio

import avl
import cocotb
from cocotb.triggers import FallingEdge, RisingEdge

from ._item import ReadItem
from ._signals import ar_m_signals, r_s_signals


[docs] class ReadMonitor(avl.Monitor):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the Read Monitor for the AXI agent. :param name: Name of the agent instance :type name: str :param parent: Parent component :type parent: Component """ super().__init__(name, parent) self.i_f = avl.Factory.get_variable(f"{self.get_full_name()}.i_f", None) self.responseQ = {} for i in range(1<<self.i_f.ID_R_WIDTH): self.responseQ[i] = avl.List()
[docs] def reset(self) -> None: """ Reset the monitor state """ for i in range(1<<self.i_f.ID_R_WIDTH): self.responseQ[i].clear()
[docs] async def wait_on_reset(self) -> None: """ Wait for the reset signal to go low and then call the reset method. This method is called to ensure that the driver is reset before driving any signals. It waits for the presetn signal to go low, indicating that the reset is active, and then calls the reset method to set all signals to their default values. """ try: await FallingEdge(self.i_f.aresetn) self.reset() except asyncio.CancelledError: raise except Exception: pass
[docs] async def monitor_control(self) -> None: """ Monitor the AXI Write Control Bus """ while True: item = ReadItem("from_monitor", self) cnt = None while True: if self.i_f.get("arvalid", default=0) == 1 and self.i_f.get("awakeup", default=1) == 1: if cnt is None: cnt = 0 else: cnt += 1 if self.i_f.get("arready", default=1) == 1: break await RisingEdge(self.i_f.aclk) for s in ar_m_signals: item.set(s, self.i_f.get(s, default=0)) item.set("ar_wait_cycles", cnt) item.resize() item.set_event("control") self.responseQ[item.get_id()].append(item) await RisingEdge(self.i_f.aclk)
[docs] async def monitor_response(self, id : int=0) -> None: """ Monitor the AXI Write Response Bus """ while True: item = await self.responseQ[id].blocking_pop() for i in range(item.get_rlen()+1): cnt = None while True: if self.i_f.get("rvalid", default=0) == 1 and self.i_f.get("rid", default=0) == id and self.i_f.get("awakeup", default=1) == 1: if cnt is None: cnt = 0 else: cnt += 1 if self.i_f.get("rready", default=1) == 1: break await RisingEdge(self.i_f.aclk) for s in r_s_signals: item.set(s, self.i_f.get(s, default=0), idx=i) item.set("r_wait_cycles", cnt, idx=i) # Reduced data phase if not hasattr(item, "awaddr"): for s in ["rdata", "rresp", "ruser", "rpoison", "rtrace", "rloop"]: if hasattr(item, s): [_or_, _and_] = getattr(item, f"_{s}_") _or_ |= item.get(s, idx=i) _and_ &= item.get(s, idx=i) if i == item.get_rlen(): # Sanity Checks item.sanity() # Export # Inform sequence response phase is complete # Extra checks for items which have both r and b responses (atomics) # Only call response callback when all completed if not item.has_bresp(): item.set_event("response", item) self.item_export.write(item) else: if hasattr(item, "_bresp_complete_"): delattr(item, "_bresp_complete_") item.set_event("response", item) self.item_export.write(item) else: setattr(item, "_rresp_complete_", True) # Wait for next edge await RisingEdge(self.i_f.aclk)
[docs] async def run_phase(self): """ Run phase for the Requester Driver. This method is called during the run phase of the simulation. It is responsible for driving the request signals based on the sequencer's items. :raises NotImplementedError: If the run phase is not implemented. """ self.reset() while True: # Wait until the we aren't in reset while True: await RisingEdge(self.i_f.aclk) aresetn = self.i_f.get("aresetn", default=0) if aresetn.is_resolvable and aresetn == 1: break tasks = [] tasks.append(cocotb.start_soon(self.monitor_control())) for i in range(1<<self.i_f.ID_R_WIDTH): tasks.append(cocotb.start_soon(self.monitor_response(i))) await self.wait_on_reset() for t in tasks: if not t.done(): t.cancel()
__all__ = ["ReadMonitor"]