# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Monitor
import asyncio
import avl
import cocotb
from cocotb.triggers import FallingEdge, RisingEdge
from ._item import WriteItem
from ._signals import aw_m_signals, b_s_signals, w_m_signals
[docs]
class WriteMonitor(avl.Monitor):
[docs]
def __init__(self, name: str, parent: avl.Component) -> None:
"""
Initialize the Write Monitor for the AXI agent.
:param name: Name of the agent instance
:type name: str
:param parent: Parent component
:type parent: Component
"""
super().__init__(name, parent)
self.i_f = avl.Factory.get_variable(f"{self.get_full_name()}.i_f", None)
# Manager Read Monitor
self._mrmon_ = None
self.dataQ = avl.List()
self.controlQ = avl.List()
self.responseQ = {}
for i in range(1<<self.i_f.ID_W_WIDTH):
self.responseQ[i] = avl.List()
[docs]
def reset(self) -> None:
"""
Reset the monitor state
"""
self.dataQ.clear()
self.controlQ.clear()
for i in range(1<<self.i_f.ID_W_WIDTH):
self.responseQ[i].clear()
[docs]
async def wait_on_reset(self) -> None:
"""
Wait for the reset signal to go low and then call the reset method.
This method is called to ensure that the driver is reset before driving any signals.
It waits for the presetn signal to go low, indicating that the reset is active,
and then calls the reset method to set all signals to their default values.
"""
try:
await FallingEdge(self.i_f.aresetn)
self.reset()
except asyncio.CancelledError:
raise
except Exception:
pass
async def _wait_for_data_(self) -> None:
"""
Wait for the wdata to populate control items
This way the data phase can be ahead of control phase
"""
while True:
item = await self.controlQ.blocking_pop()
for i in range(item.get_len()+1):
d = await self.dataQ.blocking_pop()
for k,v in d.items():
if hasattr(item, k):
item.set(k, v, idx=i)
# Reduced data phase
for s in ["wdata", "wstrb", "wuser", "wpoison", "wtrace"]:
if hasattr(item, s):
[_or_, _and_] = getattr(item, f"_{s}_")
_or_ |= item.get(s, idx=i)
_and_ &= item.get(s, idx=i)
item.set_event("data")
self.responseQ[item.get_id()].append(item)
[docs]
async def monitor_control(self) -> None:
"""
Monitor the AXI Write Control Bus
"""
self.controlQ.clear()
while True:
item = WriteItem("from_monitor", self)
cnt = None
while True:
if self.i_f.get("awvalid", default=0) == 1 and self.i_f.get("awakeup", default=1) == 1:
if cnt is None:
cnt = 0
else:
cnt += 1
if self.i_f.get("awready", default=1) == 1:
break
await RisingEdge(self.i_f.aclk)
for s in aw_m_signals:
item.set(s, self.i_f.get(s, default=0))
item.set("aw_wait_cycles", cnt)
item.resize()
item.set_event("control")
self.controlQ.append(item)
if item.has_rresp():
self._mrmon_.responseQ[item.get_id()].append(item)
await RisingEdge(self.i_f.aclk)
[docs]
async def monitor_data(self) -> None:
"""
Monitor the AXI Write Data Bus
"""
self.dataQ.clear()
cnt = None
while True:
if self.i_f.get("wvalid", default=0) == 1 and self.i_f.get("awakeup", default=1) == 1:
if cnt is None:
cnt = 0
else:
cnt += 1
if self.i_f.get("wready", default=1) == 1:
data = {}
for s in w_m_signals:
data[s] = self.i_f.get(s, default=0)
data["w_wait_cycles"] = cnt
self.dataQ.append(data)
cnt = None
await RisingEdge(self.i_f.aclk)
[docs]
async def monitor_response(self, id : int=0) -> None:
"""
Monitor the AXI Write Response Bus
:param id awid
"""
self.responseQ[id].clear()
while True:
item = await self.responseQ[id].blocking_pop()
while True:
cnt = None
if self.i_f.get("bvalid", default=0) == 1 and self.i_f.get("bid", default=0) == id:
if cnt is None:
cnt = 0
else:
cnt += 1
if self.i_f.get("bready", default=1) == 1:
break
await RisingEdge(self.i_f.aclk)
for s in b_s_signals:
item.set(s, self.i_f.get(s, default=0))
item.set("b_wait_cycles", cnt)
# Sanity Checks
item.sanity()
# Export
if not item.has_rresp():
item.set_event("response")
self.item_export.write(item)
else:
if hasattr(item, "_rresp_complete_"):
delattr(item, "_rresp_complete_")
item.set_event("response")
self.item_export.write(item)
else:
setattr(item, "_bresp_complete_", True)
# Wait for next edge
await RisingEdge(self.i_f.aclk)
[docs]
async def run_phase(self):
"""
Run phase for the Requester Driver.
This method is called during the run phase of the simulation.
It is responsible for driving the request signals based on the sequencer's items.
:raises NotImplementedError: If the run phase is not implemented.
"""
self.reset()
while True:
# Wait until the we aren't in reset
while True:
await RisingEdge(self.i_f.aclk)
aresetn = self.i_f.get("aresetn", default=0)
if aresetn.is_resolvable and aresetn == 1:
break
tasks = []
tasks.append(cocotb.start_soon(self._wait_for_data_()))
tasks.append(cocotb.start_soon(self.monitor_control()))
tasks.append(cocotb.start_soon(self.monitor_data()))
for i in range(1<<self.i_f.ID_W_WIDTH):
tasks.append(cocotb.start_soon(self.monitor_response(i)))
await self.wait_on_reset()
for t in tasks:
if not t.done():
t.cancel()
__all__ = ["WriteMonitor"]