Skip to content
Snippets Groups Projects
Commit b3de40e5 authored by dam1n19's avatar dam1n19
Browse files

Refactored ADP Cocotb driver

parent e5d443b8
Branches
Tags
No related merge requests found
......@@ -13,17 +13,47 @@ from cocotbext.axi import AxiStreamBus, AxiStreamSource, AxiStreamSink, AxiStrea
CLK_PERIOD = (10, "ns")
# Class for ADP AXI Stream Interface
class ADPDriver ():
class ADPDriver():
def __init__(self, send, recieve):
# Send Stream to NanoSoC
self.send = send
# Send Recieve to NanoSoC
self.recieve = recieve
# Read ADP String
@cocotb.coroutine
async def read(self):
read_str = ""
while True:
# Read Bytes from ADP AXI Stream
data = await self.recieve.read()
curr_char = chr(data[0])
# Combine Bytes into String
if (curr_char != '\n'):
read_str += curr_char
else:
return read_str
# Write ADP String
# @cocotb.coroutine
# async def read(self):
# read_str = ""
# while True:
# # Read Bytes from ADP AXI Stream
# data = await self.recieve.read()
# curr_char = chr(data[0])
# # Combine Bytes into String
# if (curr_char != '\n'):
# read_str += curr_char
# else:
# return read_str
# Control ADP AXI Stream bus and create ADP Driver Object
def setup_adp(dut):
adp_sender = AxiStreamSource(AxiStreamBus.from_prefix(dut, "txd8"), dut.XTAL1, dut.NRST)
adp_reciever = AxiStreamSink(AxiStreamBus.from_prefix(dut, "rxd8"), dut.XTAL1, dut.NRST)
logging.getLogger("cocotb.nanosoc_tb.rxd8").setLevel(logging.WARNING)
driver = ADPDriver(adp_sender, adp_reciever)
return driver
......@@ -43,6 +73,24 @@ async def write(driver, buf):
print(i)
await driver.tx.send(str(ord(i)))
return wr_count
@cocotb.coroutine
async def write(driver, buf):
wr_count = 0
for i in buf:
print(i)
await driver.tx.send(str(ord(i)))
return wr_count
# Wait for bootcode to finish
@cocotb.coroutine
async def wait_bootcode(dut, driver):
bootcode_last = "** Remap->IMEM0"
while True:
read_str = await driver.read()
dut.log.info(read_str)
if read_str == bootcode_last:
break
# Basic Test Clocks Test
@cocotb.test()
......@@ -54,26 +102,23 @@ async def test_clocks(dut):
# Basic Test Reading from ADP
@cocotb.test()
async def test_driver(dut):
"""Tests ADP Driver"""
log = logging.getLogger(f"cocotb.test")
async def test_adp_read(dut):
await setup_dut(dut)
adp_driver = setup_adp(dut)
logging.getLogger("cocotb.nanosoc_tb.rxd8").setLevel(logging.WARNING)
log.info("Setup Complete")
log.info("Starting Test")
temp_str = ""
while True:
# Read Bytes from ADP AXI Stream
data = await adp_driver.recieve.read()
curr_char = chr(data[0])
# Combine Bytes into Strings and Print Out
if (curr_char == '\n'):
log.info(temp_str)
# Exit Test on Last String
if temp_str == "** Remap->IMEM0":
break
temp_str = ""
else:
temp_str += curr_char
log.info("Driver Test Complete")
dut.log.info("Setup Complete")
dut.log.info("Starting Test")
await wait_bootcode(dut, adp_driver)
dut.log.info("ADP Read Test Complete")
# Basic Test Write to ADP
@cocotb.test()
async def test_adp_write(dut):
await setup_dut(dut)
adp_driver = setup_adp(dut)
dut.log.info("Setup Complete")
dut.log.info("Starting Test")
await wait_bootcode(dut, adp_driver)
dut.log.info("ADP Write Test Complete")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment