Source code for avl_axi._mwdriver

# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Driver


import avl
import cocotb
from cocotb.triggers import RisingEdge, NextTimeStep
import random

from ._driver import Driver
from ._signals import aw_m_signals, b_m_signals, b_s_signals, w_m_signals
from ._types import axi_atomic_t
from ._utils import get_beat_byte_offset

[docs] class ManagerWriteDriver(Driver):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the Manager Write Driver for the AXI agent. :param name: Name of the agent instance :type name: str :param parent: Parent component :type parent: Component """ super().__init__(name, parent) # Manager Read Driver self._mrdrv_ = None # Items Queues self.controlQ = [] self.dataQ = [] self.responseQ = {} for i in range(1<<self.i_f.ID_W_WIDTH): self.responseQ[i] = [] self.response_pending = 0 # Data before Control self.allow_early_data = avl.Factory.get_variable(f"{self.get_full_name()}.allow_early_data", False) """Allow data phase (W) to start before control phase (AW) is accepted""" if self.allow_early_data and self.max_outstanding is not None: raise ValueError("allow_early_data and max_outstanding are mutually exclusive") # AXI A3.2.3 / A3.4.1 narrow-transfer byte-lane placement opt-in (via AgentCfg). _cfg_ = avl.Factory.get_variable(f"{self.get_full_name()}.cfg", None) self.narrow_transfer_lane_steering = bool(_cfg_.narrow_transfer_lane_steering) if _cfg_ is not None else False
[docs] async def reset(self) -> None: """ Reset the driver by setting all signals to their default values. This method is called when the driver is reset. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ # Write Signals for s in aw_m_signals + w_m_signals + b_m_signals: self.i_f.set(s, 0)
[docs] async def quiesce_control(self) -> None: """ Quiesce the control signals by setting them to their default values. This method is called to ensure that the control signals are in a known state. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ for s in aw_m_signals: if s != "awpending": self.i_f.set(s, 0)
[docs] async def drive_control(self) -> None: """ Drive the control signals based on the items in the control queue. This method is responsible for driving the control signals according to the protocol. """ self.controlQ = [] while True: while not self.controlQ or self.i_f.get("aresetn") == 0: await RisingEdge(self.i_f.aclk) item = self.controlQ.pop(0) self.wake_export.write(item) self.i_f.set("awvalid", 0) # Wake await item.wait_on_event("awake") # Credit Control rp = [item.get("awrp", default=0)] if self.i_f.Shared_Credits_AW == 1: rp.append(self.i_f.Num_RP_AWW) sel_rp = await self.wait_on_credit("aw", rp) # Rate Limiter await self.wait_on_rate(self.control_rate_limit()) # Unique ID if item.get_idunq() or item.get("awatop", default=axi_atomic_t.NON_ATOMIC) != axi_atomic_t.NON_ATOMIC: while self._unique_ids_[item.get_id()] > 0: await RisingEdge(self.i_f.aclk) if item.get("awatop", default=axi_atomic_t.NON_ATOMIC) != axi_atomic_t.NON_ATOMIC: while self._mrdrv_._unique_ids_[item.get_id()] > 0: await RisingEdge(self.i_f.aclk) # TAG Unique ID if item.get_tagop() != 0: while self._tag_ids_[item.get_id()] > 0: await RisingEdge(self.i_f.aclk) # Max Outstanding while self.max_outstanding is not None and self._outstanding_transactions_ >= self.max_outstanding: await RisingEdge(self.i_f.aclk) self._outstanding_transactions_ += 1 # Pending if not bool(self.i_f.get("awpending")): self.i_f.set("awpending", 1) await RisingEdge(self.i_f.aclk) for s in aw_m_signals: if s == "awvalid": self.i_f.set(s, 1) elif s == "awsharedcrd": self.i_f.set(s, (sel_rp == self.i_f.Num_RP_AWW)) elif s == "awpending": if random.random() > self.pending_rate_limit(): self.i_f.set(s, 0) else: self.i_f.set(s, item.get(s, default=0)) # Start Data Phase if not self.allow_early_data: self.dataQ.append(item) while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("awready", default=1) and self.i_f.get("awakeup", default=1): break # Clear the bus await self.quiesce_control() # Inform sequence control phase is completed item.set_event("control", item)
[docs] async def quiesce_data(self) -> None: """ Quiesce the data signals by setting them to their default values. This method is called to ensure that the data signals are in a known state. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ for s in w_m_signals: if s != "wpending": self.i_f.set(s, 0)
[docs] async def drive_data(self) -> None: """ Drive the data signals based on the items in the data queue. This method is responsible for driving the data signals according to the protocol. """ self.dataQ = [] while True: while not self.dataQ or self.i_f.get("aresetn") == 0: await RisingEdge(self.i_f.aclk) item = self.dataQ.pop(0) # Wake await item.wait_on_event("awake") for i in range(item.get_len()+1): self.i_f.set("wvalid", 0) # Credit Control rp = [item.get("wwrp", default=0)] if self.i_f.Shared_Credits_W == 1: rp.append(self.i_f.Num_RP_AWW) sel_rp = await self.wait_on_credit("w", rp) # Rate Limiter await self.wait_on_rate(self.data_rate_limit()) # Pending if not bool(self.i_f.get("arpending")): self.i_f.set("wpending", 1) await RisingEdge(self.i_f.aclk) # Narrow-transfer byte-lane placement (AXI A3.2.3 / A3.4.1) is opt-in # via cfg.narrow_transfer_lane_steering: when enabled, position this # beat's wdata/wstrb at byte lanes [byte_offset .. byte_offset + # (1<<awsize) - 1] of the data bus. When disabled, drive the values # verbatim (legacy behaviour, byte_offset effectively 0). if self.narrow_transfer_lane_steering: awaddr = int(item.get("awaddr", default=0)) awsize = int(item.get("awsize", default=0)) awburst = int(item.get("awburst", default=1)) awlen = int(item.get("awlen", default=0)) byte_offset = get_beat_byte_offset( awaddr, i, awlen, awsize, awburst, self.i_f.STRB_WIDTH ) else: byte_offset = 0 for s in w_m_signals: if s == "wvalid": self.i_f.set(s, 1) elif s == "wlast": self.i_f.set(s, i == item.get_len()) elif s == "wsharedcrd": self.i_f.set(s, (sel_rp == self.i_f.Num_RP_AWW)) elif s == "wpending": if random.random() > self.pending_rate_limit(): self.i_f.set(s, 0) elif s == "wdata" and byte_offset: self.i_f.set(s, int(item.get(s, idx=i, default=0)) << (byte_offset * 8)) elif s == "wstrb" and byte_offset: self.i_f.set(s, int(item.get(s, idx=i, default=0)) << byte_offset) else: self.i_f.set(s, item.get(s, idx=i, default=0)) while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("wready", default=1) and self.i_f.get("awakeup", default=1): break # Clear the bus await self.quiesce_data() # Inform sequence data phase is complete item.set_event("data", item)
[docs] async def quiesce_response(self) -> None: """ Quiesce the response signals by setting them to their default values. This method is called to ensure that the response signals are in a known state. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ for s in b_m_signals: self.i_f.set(s, 0)
[docs] async def drive_response(self): """ Drive the response signals based on the items in the response queue. This method is responsible for driving the response signals according to the protocol. """ for k in self.responseQ.keys(): self.responseQ[k] = [] while True: while self.response_pending == 0 or self.i_f.get("aresetn") == 0: await RisingEdge(self.i_f.aclk) # Rate Limiter await self.wait_on_rate(self.response_rate_limit()) self.i_f.set("bready", 1) while True: await RisingEdge(self.i_f.aclk) if bool(self.i_f.get("bvalid", default=1)) and bool(self.i_f.get("bready", default=1)): break bid = int(self.i_f.get("bid", default=0)) item = self.responseQ[bid].pop(0) for s in b_s_signals: if s != "bvalid": item.set(s, self.i_f.get(s, default=0)) await self.quiesce_response() # Decrement outstanding response counter self.response_pending -= 1 # Inform sequence response phase is complete # Extra checks for items which have both r and b responses (atomics) # Only call response callback when all completed if not item.has_rresp(): item.set_event("response", item) else: if hasattr(item, "_rresp_complete_"): delattr(item, "_rresp_complete_") item.set_event("response", item) else: setattr(item, "_bresp_complete_", True)
[docs] async def drive_credits(self) -> None: """ Drive credits """ if self.i_f.AXI_Transport != "Credited": return while True: await RisingEdge(self.i_f.aclk) if self.i_f.get("aresetn") == 0: continue if int(self.i_f.get("bcredits", idx=0, default=0)) < self.i_f.NUM_CREDITS and random.random() <= self.credit_rate_limit(): self.i_f.set("bcrdt", 1) else: self.i_f.set("bcrdt", 0)
[docs] async def run_phase(self): """ Run phase for the Requester Driver. This method is called during the run phase of the simulation. It is responsible for driving the request signals based on the sequencer's items. :raises NotImplementedError: If the run phase is not implemented. """ item = None cocotb.start_soon(super().run_phase()) while True: item = await self.get_next_item(item) if not hasattr(item, "awaddr"): continue item.add_event("control", self._activate_) item.add_event("response", self._deactivate_) self.controlQ.append(item) if self.allow_early_data: self.dataQ.append(item) id = item.get_id() if item.has_bresp(): self.responseQ[id].append(item) self.response_pending += 1 if item.has_rresp(): self._mrdrv_.responseQ[id].append(item) self._mrdrv_.response_pending += 1 item.set_event("done")
__all__ = ["ManagerWriteDriver"]