Skip to content
Snippets Groups Projects
Commit c1ede1b6 authored by dam1n19's avatar dam1n19
Browse files

Updates to Cocotb testbench and ft1248 to axis interface

parent d8e1e715
No related branches found
No related tags found
No related merge requests found
...@@ -13,63 +13,119 @@ import os ...@@ -13,63 +13,119 @@ import os
class ADP(): class ADP():
def __init__(self, dut, send, recieve): def __init__(self, dut, send, recieve):
self.dut = dut self.dut = dut
self.log = self.dut.log
# Send Stream to NanoSoC # Send Stream to NanoSoC
self.send = send self.send = send
# Send Recieve to NanoSoC # Send Recieve to NanoSoC
self.recieve = recieve self.recieve = recieve
self.monitor_mode = False self.monitor_mode = False
# Read Block from ADP # Logging Function
def info(self, str):
self.log.info(str.strip())
# Reads Byte from ADP AXI Stream
@cocotb.coroutine @cocotb.coroutine
async def read(self, length): async def read8(self):
read_str = "" # Read Byte from ADP AXI Stream
for i in range(length):
data = await self.recieve.read() data = await self.recieve.read()
read_str += chr(data[0]) return chr(data[0])
return read_str
# Writes Byte from ADP AXI Stream
@cocotb.coroutine
async def write8(self, val):
await self.send.write([val])
# Read ADP String # Read ADP String
@cocotb.coroutine @cocotb.coroutine
async def readLine(self): async def readLine(self):
read_str = "" read_str = ""
while True: while True:
# Read Bytes from ADP AXI Stream curr_char = await self.read8()
data = await self.recieve.read()
curr_char = chr(data[0])
# Combine Bytes into String # Combine Bytes into String
if (curr_char != '\n'): if (curr_char not in ['\n',"\r"]):
read_str += curr_char read_str += curr_char
else: else:
return read_str.strip() # Return on Newline/Carrige Return
return read_str
# Write ADP String
@cocotb.coroutine
async def write8(self, val):
await self.send.write([val])
# Write ADP String # Write ADP String
@cocotb.coroutine @cocotb.coroutine
async def write(self, string): async def write_bytes(self, buf):
for i in string: for i in buf:
await self.send.write([ord(i)]) await self.write8(ord(i))
await self.send.write([0x0a]) # Append two additional newlines for reliability
read_str = await self.readLine() await self.write8(0x0a)
self.dut.log.info(read_str)
# Enter Monitor Mode # Enter Monitor Mode
@cocotb.coroutine @cocotb.coroutine
async def monitorModeEnter(self): async def monitorModeEnter(self):
self.dut.log.info("Entering ADP Monitor Mode") self.dut.log.info("Entering ADP Monitor Mode")
await self.send.write([0x1b]) await self.write8(0x1b)
# await self.write8(0x0a)
await self.wait_string("]", debug=True)
self.monitor_mode = True self.monitor_mode = True
# Exit Monitor Mode # Exit Monitor Mode
@cocotb.coroutine @cocotb.coroutine
async def monitorModeExit(self): async def monitorModeExit(self):
self.dut.log.info("Exiting ADP Monitor Mode") self.dut.log.info("Exiting ADP Monitor Mode")
await self.send.write([0x04]) await self.write8(0x04)
self.monitor_mode = False self.monitor_mode = False
# Read Block from ADP
@cocotb.coroutine
async def wait_response(self, debug=False):
read_str = ""
while read_str in ["","]]"]:
read_str = await self.readLine()
if debug == True:
self.info(read_str)
return read_str
# Read Block from ADP
@cocotb.coroutine
async def wait_string(self, string, debug=False):
read_str = ""
while read_str != string:
read_str = await self.readLine()
if debug == True:
self.info(read_str)
return read_str
# Set ADP Address Pointer
@cocotb.coroutine
async def set_address(self, address):
address_string = f"A {hex(address)}"
await self.write_bytes(address_string)
resp = await self.wait_response()
# Ensure Address Read Back Matches that of what was sent
assert resp.split()[1] == hex(address)
self.info(f"ADP Address Pointer Set: {hex(address)}")
# Get ADP Current Address Pointer
@cocotb.coroutine
async def get_address(self):
address_string = f"A"
await self.write_bytes(address_string)
resp = await self.wait_response(debug = True)
# Ensure Address Read Back Matches that of what was sent
assert resp.split()[0] == "]A"
self.info(f"ADP Address Pointer: {resp.split()[1]}")
return int(resp.split()[1],16)
# Set Address to Read From
@cocotb.coroutine
async def read_bytes(self, reads):
read_string = f"R {str(int(reads))}\n"
self.info(read_string)
await self.write(read_string)
for i in range(reads):
resp = await self.wait_response()
self.info(resp)
self.info(f"Performing {hex(reads)} Reads")
# Write Hex File into System # Write Hex File into System
@cocotb.coroutine @cocotb.coroutine
async def writeHex(self, file_name, address=0x20000000): async def writeHex(self, file_name, address=0x20000000):
...@@ -77,12 +133,13 @@ class ADP(): ...@@ -77,12 +133,13 @@ class ADP():
file_stats = os.stat(file_name) file_stats = os.stat(file_name)
file_len_in_bytes = round(file_stats.st_size/3) file_len_in_bytes = round(file_stats.st_size/3)
bytecount_hex=hex(file_len_in_bytes) bytecount_hex=hex(file_len_in_bytes)
await self.write(f'A {str(hex(address))}\n') await self.set_address(address)
await self.write(f'U {str(bytecount_hex)}\n') await self.write_bytes(f'U {str(bytecount_hex)}\n')
count = file_len_in_bytes count = file_len_in_bytes
with open(file_name, mode='r') as file: with open(file_name, mode='r') as file:
for i in range(count) : for i in range(count) :
b=file.readline() b=file.readline()
await self.write8(int(str.strip(b),16)) await self.write8(int(str.strip(b),16))
await self.write8(0x0a) await self.write8(0x0a)
self.dut.log.info("Finished Send Code") self.info(await self.wait_response())
\ No newline at end of file self.info("Finished Send Code")
\ No newline at end of file
...@@ -24,11 +24,12 @@ CLK_PERIOD = (10, "ns") ...@@ -24,11 +24,12 @@ CLK_PERIOD = (10, "ns")
# Control ADP AXI Stream bus and create ADP Driver Object # Control ADP AXI Stream bus and create ADP Driver Object
def setup_adp(dut): def setup_adp(dut):
adp_sender = AxiStreamSource(AxiStreamBus.from_prefix(dut, "txd8"), dut.XTAL1, dut.NRST, reset_active_level=False)
adp_reciever = AxiStreamSink(AxiStreamBus.from_prefix(dut, "rxd8"), dut.XTAL1, dut.NRST, reset_active_level=False)
logging.getLogger("cocotb.nanosoc_tb.rxd8").setLevel(logging.WARNING) logging.getLogger("cocotb.nanosoc_tb.rxd8").setLevel(logging.WARNING)
logging.getLogger("cocotb.nanosoc_tb.txd8").setLevel(logging.WARNING) logging.getLogger("cocotb.nanosoc_tb.txd8").setLevel(logging.WARNING)
adp_sender = AxiStreamSource(AxiStreamBus.from_prefix(dut, "txd8"), dut.XTAL1, dut.NRST, reset_active_level=False)
adp_reciever = AxiStreamSink(AxiStreamBus.from_prefix(dut, "rxd8"), dut.XTAL1, dut.NRST, reset_active_level=False)
driver = ADP(dut, adp_sender, adp_reciever) driver = ADP(dut, adp_sender, adp_reciever)
driver.write8(0x00)
return driver return driver
# Start Clocks and Reset System # Start Clocks and Reset System
...@@ -55,7 +56,6 @@ async def wait_bootcode(dut, driver): ...@@ -55,7 +56,6 @@ async def wait_bootcode(dut, driver):
# Wait for bootcode to finish # Wait for bootcode to finish
@cocotb.coroutine @cocotb.coroutine
async def wait_hello(dut, driver): async def wait_hello(dut, driver):
test_passed = "** TEST PASSED **"
while True: while True:
read_str = await driver.readLine() read_str = await driver.readLine()
dut.log.info(read_str) dut.log.info(read_str)
...@@ -89,10 +89,26 @@ async def test_adp_write(dut): ...@@ -89,10 +89,26 @@ async def test_adp_write(dut):
await wait_bootcode(dut, adp_driver) await wait_bootcode(dut, adp_driver)
dut.log.info("Bootcode Finished") dut.log.info("Bootcode Finished")
await adp_driver.monitorModeEnter() await adp_driver.monitorModeEnter()
await adp_driver.write('A 0x10000000\n') test_str = "\n\r]"
await adp_driver.write('R 4\n') read_str = await adp_driver.read8()
for i in range(4): while read_str != test_str:
dut.log.info(await adp_driver.readLine()) read_str += await adp_driver.read8()
dut.log.info(repr(read_str))
# dut.log.info(await adp_driver.wait_response())
# dut.log.info("Setting Address")
# await adp_driver.set_address(0x10000000)
# await adp_driver.get_address()
# await adp_driver.set_address(0x10000000)
# await adp_driver.read_bytes(4)
# adp_driver.info(await adp_driver.wait_response())
# await adp_driver.write_bytes('A 0x10000000\n')
# adp_driver.info(await adp_driver.wait_response())
# await adp_driver.write_bytes('A\n')
# adp_driver.info(await adp_driver.wait_response())
# await adp_driver.write('R 4\n')
# for i in range(2):
# dut.log.info(await adp_driver.readLine())
dut.log.info("ADP Write Test Complete") dut.log.info("ADP Write Test Complete")
# Basic Software Load Test # Basic Software Load Test
...@@ -106,10 +122,17 @@ async def test_adp_hello(dut): ...@@ -106,10 +122,17 @@ async def test_adp_hello(dut):
dut.log.info("Bootcode Finished") dut.log.info("Bootcode Finished")
await adp_driver.monitorModeEnter() await adp_driver.monitorModeEnter()
await adp_driver.writeHex(hello_hex, 0x20000000) await adp_driver.writeHex(hello_hex, 0x20000000)
adp_driver.info("Writen the hex")
await adp_driver.monitorModeEnter()
await adp_driver.get_address()
await adp_driver.set_address(0x20000000)
await adp_driver.read_bytes("R 4")
await adp_driver.monitorModeExit() await adp_driver.monitorModeExit()
dut.NRST.value = 0 dut.NRST.value = 0
await ClockCycles(dut.XTAL1, 2) await ClockCycles(dut.XTAL1, 2)
dut.NRST.value = 1 dut.NRST.value = 1
for i in range(4):
adp_driver.info(adp_driver.readLine())
await wait_hello(dut, adp_driver) await wait_hello(dut, adp_driver)
dut.log.info("ADP Write Test Complete") dut.log.info("ADP Write Test Complete")
......
...@@ -174,13 +174,18 @@ assign txd_tdata8_o = txstream[7:0]; ...@@ -174,13 +174,18 @@ assign txd_tdata8_o = txstream[7:0];
// 8-bit write port with extra top-bit used as valid qualifer // 8-bit write port with extra top-bit used as valid qualifer
reg [8:0] rxstream; reg [8:0] rxstream;
reg rxstream_valid;
always @(posedge aclk or negedge aresetn) always @(posedge aclk or negedge aresetn)
if (!aresetn) if (!aresetn) begin
rxstream <= 9'b000000000; rxstream <= 9'b000000000;
else if (!rxstream[8] & rxd_tvalid_i) // if empty can accept valid RX stream data rxstream_valid <= 1'b0;
end else if (!rxstream[8] & rxd_tvalid_i) begin// if empty can accept valid RX stream data
rxstream_valid <= 1'b1;
rxstream[8:0] <= {1'b1,rxd_tdata8_i}; rxstream[8:0] <= {1'b1,rxd_tdata8_i};
else if (rxstream[8] & ft_clk_rising & ft_cmd_rxd & (ft_state==5'b01110)) // hold until final shift completion end else if (rxstream[8] & ft_clk_rising & ft_cmd_rxd & (ft_state==5'b01110)) begin// hold until final shift completion
rxstream[8] <= 1'b0; rxstream[8] <= 1'b0;
rxstream_valid <= 1'b0;
end
assign rxd_tready_o = !rxstream[8]; // ready until loaded assign rxd_tready_o = !rxstream[8]; // ready until loaded
...@@ -192,7 +197,7 @@ always @(posedge aclk or negedge aresetn) ...@@ -192,7 +197,7 @@ always @(posedge aclk or negedge aresetn)
txd_sr <= 8'b00000000; txd_sr <= 8'b00000000;
else if (ft_ssn) // sync reset else if (ft_ssn) // sync reset
txd_sr <= 8'b00000000; txd_sr <= 8'b00000000;
else if (ft_clk_falling & ft_cmd_rxd & rxd_tvalid_i & (ft_state == 5'b00111)) else if (ft_clk_falling & ft_cmd_rxd & rxstream_valid & (ft_state == 5'b00111))
txd_sr <= rxstream[7:0]; txd_sr <= rxstream[7:0];
else if (ft_clk_rising & ft_cmd_rxd & (ft_state[4:3] == 2'b01)) //serial shift else if (ft_clk_rising & ft_cmd_rxd & (ft_state[4:3] == 2'b01)) //serial shift
txd_sr <= {1'b0,txd_sr[7:1]}; txd_sr <= {1'b0,txd_sr[7:1]};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment