Select Git revision
-
Daniel Newbrook authoredDaniel Newbrook authored
apb.py 13.35 KiB
# MIT License
# Copyright (c) 2021 SystematIC Design BV
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
APB Transaction and Agent
(Driver + Monitor)
"""
from collections import deque
import random
import cocotb
from cocotb.triggers import RisingEdge, ReadOnly
from cocotb.binary import BinaryValue
from cocotb_bus.drivers import BusDriver
from cocotb_bus.monitors import BusMonitor
from cocotb.result import ReturnValue
from cocotb.decorators import coroutine
from cocotb_coverage.crv import Randomized
# define the PWRITE mapping
pwrite = [ 'READ',
'WRITE' ]
class APBTransaction(Randomized):
"""
APB Transaction Class
Defines the transaction in terms of the fields
"""
def __init__(self, address, data=None, direction=None, strobe=[True,True,True,True],
error=None, bus_width=32, address_width=12):
Randomized.__init__(self)
# check input values
assert direction in [None, 'READ', 'WRITE'], "The direction must be either: None, 'READ', 'WRITE'"
# select based on read/write operation
if data != None:
if direction:
self.direction = direction
else:
self.direction = 'WRITE'
self.data = data
else:
self.direction = 'READ'
self.data = None
# save the straight through parameters
self.address = address
self.bus_width = bus_width
self.address_width = address_width
self.strobe = strobe
# store the error setting
if error != None:
self.error = error
else:
self.error = False
# store time of the transaction
self.start_time = None
def post_randomize(self):
'''
Generate a randomized transaction
'''
# select a random direction
self.direction = ['READ','WRITE'][random.randint(0,1)]
# select the transaction length
self.address = random.randint(0,2**(self.address_width-2))*4
# if we're writing generate the data
if self.direction == 'WRITE':
self.data = random.randint(0,self.bus_width)
# create random strobe data
for i in range(4):
self.strobe[i] = bool(random.randint(0,1))
def print(self):
'''
Print a transaction information in a nice readable format
'''
print('-'*120)
print('APB Transaction - ', end='')
if self.start_time:
print('Started at %d ns' % self.start_time)
else:
print('Has not occurred yet')
print('')
print(' Address: 0x%08X' % self.address)
print(' Direction: %s' % self.direction)
print(' Data: ', end='')
if self.data != None:
print('0x%0*X ' % (int(self.bus_width/4),self.data))
else:
print('NO DATA YET!')
if self.error:
print(' TRANSACTION ENDED IN ERROR!')
print('')
print('-'*120)
def convert2string(self):
"""
Returns a string - used by UVM.
"""
return "APB: address: %s, direction: %s, data: %s, strobe: %s" % (
hex(self.address), self.direction, hex(self.data), hex(self._strobe()) )
#overload (not)equlity operators - just compare mosi and miso data match
def __ne__(self, other):
return NotImplemented
def __eq__(self, other):
# compare each field
fail = False
fail = fail or not (self.address == other.address)
fail = fail or not (self.direction == other.direction)
fail = fail or not (self.data == other.data)
# return response
return not fail
def _strobe(self):
"""
Return an integer representation of the byte strobes.
"""
try:
return int(''.join([ '1' if x else '0' for x in self.strobe ]), 2)
except ValueError as e:
print(self.strobe)
raise e
def __repr__(self):
return self.convert2string()
class APBMonitor(BusMonitor):
"""
APB Master Monitor
Observes the bust to monitor all transactions and provide callbacks
with the observed data
"""
def __init__(self, entity, name, clock, pkg=False, signals=None, bus_width=32, **kwargs):
# has the signals been explicitely defined?
if signals:
self._signals = signals
else:
# a SystemVerilog package is used
if pkg:
self._signals = {}
for signal_name in ['psel', 'pwrite', 'penable', 'paddr', 'pwdata', 'pstrb']:
self._signals[signal_name.upper()] = name + '_h2d_i.' + signal_name
for signal_name in ['prdata', 'pready', 'pslverr']:
self._signals[signal_name.upper()] = name + '_d2h_o.' + signal_name
name = None
# just use the default APB names
else:
self._signals = [
"PSEL",
"PWRITE",
"PENABLE",
"PADDR",
"PWDATA",
"PRDATA",
"PREADY"]
self._optional_signals = [
"PSLVERR",
"PSTRB"]
BusMonitor.__init__(self, entity, name, clock, **kwargs)
self.clock = clock
self.bus_width = bus_width
# prime the monitor to begin
self.reset()
def reset(self):
'''
Mimic the reset functon in hardware
'''
pass
async def _monitor_recv(self):
'''
Keep watching the bus until the peripheral is signalled as:
Selected
Enabled
Ready
Then simply sample the address, data and direction
'''
await RisingEdge(self.clock)
while True:
# both slave and master are ready for transfer
if self.bus.PSEL.value.integer and self.bus.PENABLE.value.integer and self.bus.PREADY.value.integer:
# retrieve the data from the bus
address = self.bus.PADDR.value.integer
direction = pwrite[self.bus.PWRITE.value.integer]
# are we reading or writing?
if direction == 'READ':
data = self.bus.PRDATA.value.integer
else:
data = self.bus.PWDATA.value.integer
# store the transaction object
transaction = APBTransaction( address = address,
data = data,
direction = direction)
transaction.start_time = cocotb.utils.get_sim_time('ns')
# find out if there's an error from the slave
if self.bus.PSLVERR.value.integer:
transaction.error = True
# signal to the callback
self._recv(transaction)
# begin next cycle
await RisingEdge(self.clock)
class APBMasterDriver(BusDriver):
"""
APB Master Driver
Drives data onto the APB bus to setup for read/write to slave devices.
"""
def __init__(self, entity, name, clock, pkg=False, signals=None, **kwargs):
# has the signals been explicitely defined?
if signals:
self._signals = signals
else:
# a SystemVerilog package is used
if pkg:
self._signals = {}
for signal_name in ['psel', 'pwrite', 'penable', 'paddr', 'pwdata', 'pstrb']:
self._signals[signal_name.upper()] = name + '_h2d_i.' + signal_name
for signal_name in ['prdata', 'pready', 'pslverr']:
self._signals[signal_name.upper()] = name + '_d2h_o.' + signal_name
name = None
# just use the default APB names
else:
self._signals = [
"PSEL",
"PWRITE",
"PENABLE",
"PADDR",
"PWDATA",
"PRDATA",
"PREADY"]
self._optional_signals = [
"PSLVERR",
"PSTRB"]
# inheret the bus driver
BusDriver.__init__(self, entity, name, clock, bus_separator='_', **kwargs)
self.clock = clock
# initialise all outputs to zero
self.bus.PADDR.setimmediatevalue(0)
self.bus.PWRITE.setimmediatevalue(0)
self.bus.PSEL.setimmediatevalue(0)
self.bus.PENABLE.setimmediatevalue(0)
self.bus.PWDATA.setimmediatevalue(0)
self.bus.PSTRB.setimmediatevalue(0)
self.reset()
def reset(self):
'''
Mimic the reset function in hardware
'''
# initialise the transmit queue
self.transmit_queue = deque()
self.transmit_coroutine = 0
@coroutine
async def busy_send(self, transaction):
'''
Provide a send method that waits for the transaction to complete.
'''
await self.send(transaction)
while (self.transfer_busy):
await RisingEdge(self.clock)
@coroutine
async def _driver_send(self, transaction, sync=True, hold=False, **kwargs):
'''
Append a new transaction to be transmitted
'''
# add new transaction
self.transmit_queue.append(transaction)
# launch new transmit pipeline coroutine if aren't holding for and the
# the coroutine isn't already running.
# If it is running it will just collect the transactions in the
# queue once it gets to them.
if not hold:
if not self.transmit_coroutine:
self.transmit_coroutine = cocotb.fork(self._transmit_pipeline())
@coroutine
async def _transmit_pipeline(self):
'''
Maintain a parallel operation transmitting all the items
in the pipline
'''
# default values
transaction_remaining = 0
state = 'SETUP'
self.transfer_busy = True
# while there's data in the queue keep transmitting
while len(self.transmit_queue) > 0 or state != 'IDLE':
if state == 'SETUP':
# get a new transaction from the queue
current_transaction = self.transmit_queue.popleft()
current_transaction.start_time = cocotb.utils.get_sim_time('ns')
# assign values in the control phase
self.bus.PSEL <= 1
self.bus.PADDR <= current_transaction.address
self.bus.PWRITE <= pwrite.index(current_transaction.direction)
# create the PSTRB signal
pstrb_int = 0
for i, pstrb_i in enumerate(current_transaction.strobe):
pstrb_int += pstrb_i << i
self.bus.PSTRB <= pstrb_int
# write the data to the bus
if current_transaction.direction == 'WRITE':
self.bus.PWDATA <= current_transaction.data
# update state
state = 'ACCESS'
elif state == 'ACCESS':
# tell the slave we're ready for the access phase
self.bus.PENABLE <= 1
state = 'SAMPLE'
await RisingEdge(self.clock)
if state == 'SAMPLE':
# is the slave ready?
if self.bus.PREADY.value.integer:
# check if the slave is asserting an error
if self.bus.PSLVERR.value.integer:
current_transaction.error = True
# if this is a read we should sample the data
if current_transaction.direction == 'READ':
current_transaction.data = self.bus.PRDATA.value.integer
# what's the next state?
if len(self.transmit_queue) > 0:
state = 'SETUP'
else:
state = 'IDLE'
self.bus.PENABLE <= 0
# reset the bus signals
self.bus.PWDATA <= 0
self.bus.PWRITE <= 0
self.bus.PSEL <= 0
self.bus.PENABLE <= 0
self.transfer_busy = False