From b3de40e529d558c96efac78feca5a1252e9443dc Mon Sep 17 00:00:00 2001 From: dam1n19 <dam1n19@soton.ac.uk> Date: Thu, 6 Jul 2023 17:21:19 +0100 Subject: [PATCH] Refactored ADP Cocotb driver --- verif/cocotb/test_adp.py | 89 ++++++++++++++++++++++++++++++---------- 1 file changed, 67 insertions(+), 22 deletions(-) diff --git a/verif/cocotb/test_adp.py b/verif/cocotb/test_adp.py index 64f2857..b4463d0 100644 --- a/verif/cocotb/test_adp.py +++ b/verif/cocotb/test_adp.py @@ -13,17 +13,47 @@ from cocotbext.axi import AxiStreamBus, AxiStreamSource, AxiStreamSink, AxiStrea CLK_PERIOD = (10, "ns") # Class for ADP AXI Stream Interface -class ADPDriver (): +class ADPDriver(): def __init__(self, send, recieve): # Send Stream to NanoSoC self.send = send # Send Recieve to NanoSoC self.recieve = recieve + + # Read ADP String + @cocotb.coroutine + async def read(self): + read_str = "" + while True: + # Read Bytes from ADP AXI Stream + data = await self.recieve.read() + curr_char = chr(data[0]) + # Combine Bytes into String + if (curr_char != '\n'): + read_str += curr_char + else: + return read_str + + # Write ADP String + # @cocotb.coroutine + # async def read(self): + # read_str = "" + # while True: + # # Read Bytes from ADP AXI Stream + # data = await self.recieve.read() + # curr_char = chr(data[0]) + # # Combine Bytes into String + # if (curr_char != '\n'): + # read_str += curr_char + # else: + # return read_str + # Control ADP AXI Stream bus and create ADP Driver Object def setup_adp(dut): adp_sender = AxiStreamSource(AxiStreamBus.from_prefix(dut, "txd8"), dut.XTAL1, dut.NRST) adp_reciever = AxiStreamSink(AxiStreamBus.from_prefix(dut, "rxd8"), dut.XTAL1, dut.NRST) + logging.getLogger("cocotb.nanosoc_tb.rxd8").setLevel(logging.WARNING) driver = ADPDriver(adp_sender, adp_reciever) return driver @@ -43,6 +73,24 @@ async def write(driver, buf): print(i) await driver.tx.send(str(ord(i))) return wr_count + +@cocotb.coroutine +async def write(driver, buf): + wr_count = 0 + for i in buf: + print(i) + await driver.tx.send(str(ord(i))) + return wr_count + +# Wait for bootcode to finish +@cocotb.coroutine +async def wait_bootcode(dut, driver): + bootcode_last = "** Remap->IMEM0" + while True: + read_str = await driver.read() + dut.log.info(read_str) + if read_str == bootcode_last: + break # Basic Test Clocks Test @cocotb.test() @@ -54,26 +102,23 @@ async def test_clocks(dut): # Basic Test Reading from ADP @cocotb.test() -async def test_driver(dut): - """Tests ADP Driver""" - log = logging.getLogger(f"cocotb.test") +async def test_adp_read(dut): await setup_dut(dut) adp_driver = setup_adp(dut) - logging.getLogger("cocotb.nanosoc_tb.rxd8").setLevel(logging.WARNING) - log.info("Setup Complete") - log.info("Starting Test") - temp_str = "" - while True: - # Read Bytes from ADP AXI Stream - data = await adp_driver.recieve.read() - curr_char = chr(data[0]) - # Combine Bytes into Strings and Print Out - if (curr_char == '\n'): - log.info(temp_str) - # Exit Test on Last String - if temp_str == "** Remap->IMEM0": - break - temp_str = "" - else: - temp_str += curr_char - log.info("Driver Test Complete") + dut.log.info("Setup Complete") + dut.log.info("Starting Test") + await wait_bootcode(dut, adp_driver) + dut.log.info("ADP Read Test Complete") + +# Basic Test Write to ADP +@cocotb.test() +async def test_adp_write(dut): + await setup_dut(dut) + adp_driver = setup_adp(dut) + dut.log.info("Setup Complete") + dut.log.info("Starting Test") + await wait_bootcode(dut, adp_driver) + dut.log.info("ADP Write Test Complete") + + + -- GitLab