Skip to content
Snippets Groups Projects
Commit dc744f46 authored by Paul-Winpenny's avatar Paul-Winpenny
Browse files

Graph maker is now inside of the ros2 package to make it easier to save the graph.

parent 8c77254c
No related branches found
No related tags found
No related merge requests found
File added
import json
import time
import os
from realtime_location_cli_only import AnchorTagCLI
class GraphMaker:
def __init__(self, port="COM11", testing=False):
self.app = AnchorTagCLI(port=port, testing=testing)
self.graph = {
"name": "Lab 1 Extended",
"nodes": [],
"connections": []
}
self.previous_node_index = None
def call_bpm_and_store_anchors(self):
self.app.call_bpm()
for name, (x, y) in self.app.anchors.items():
self.graph["nodes"].append({
"name": name,
"x": x,
"y": y
})
# Initialize connections matrix with False
num_nodes = len(self.graph["nodes"])
self.graph["connections"] = [[False] * num_nodes for _ in range(num_nodes)]
print("Anchor coordinates stored successfully.")
def start_session(self):
print("Waiting for 'start' command...")
while True:
user_input = input("Enter 'start' to begin adding nodes or 'quit' to exit: ").strip().lower()
if user_input == "start":
print("Session started. Type 'save' to add nodes, 'finish' to save the graph, or 'quit' to exit.")
break
elif user_input == "quit":
print("Exiting...")
exit()
def add_node_and_connect(self):
print(self.app.tag_distances)
distances = self.app.serial_buffer.getRangingDistances()
tag1_x, tag1_y = self.app.update_distances_and_calculate_tags()
print(f"{tag1_x}, {tag1_y}")
if tag1_x is None or tag1_y is None:
print("Failed to determine tag position. Skipping node creation.")
return
new_node_index = len(self.graph["nodes"])
# Add new node
self.graph["nodes"].append({
"name": f"Node{new_node_index+1}",
"x": tag1_x,
"y": tag1_y
})
# Dynamically resize the connections matrix
for row in self.graph["connections"]:
row.append(False) # Add a column for the new node
self.graph["connections"].append([False] * (new_node_index + 1)) # Add a new row
# Update connections
if self.previous_node_index is not None:
self.graph["connections"][self.previous_node_index][new_node_index] = True
self.graph["connections"][new_node_index][self.previous_node_index] = True
self.previous_node_index = new_node_index
print(f"Node {new_node_index+1} added at ({tag1_x:.2f}, {tag1_y:.2f}).")
def save_graph(self):
directory = r"D:\Github\robobin\ros2\src\robobin\robobin\graphs" #Replace with whereever your graphs folder is in the ros2 workspace
os.makedirs(directory, exist_ok=True) # Create the directory if it doesn't exist
file_path = os.path.join(directory, "graph.json")
with open(file_path, "w") as f:
json.dump(self.graph, f, indent=2)
print(f"Graph saved to '{file_path}'.")
def run(self):
while True:
user_input = input("Enter command ('bpm', 'start', 'save', 'finish', 'quit'): ").strip().lower()
if user_input == "bpm":
self.call_bpm_and_store_anchors()
elif user_input == "start":
self.start_session()
while True:
user_input = input("Enter 'save' to add a node, 'finish' to save and exit, or 'quit' to exit: ").strip().lower()
if user_input == "save":
self.add_node_and_connect()
elif user_input == "finish":
self.save_graph()
print("Session finished.")
exit()
elif user_input == "quit":
print("Exiting without saving.")
exit()
else:
print("Invalid command. Try 'save', 'finish', or 'quit'.")
elif user_input == "quit":
print("Exiting...")
break
else:
print("Invalid command. Try 'bpm', 'start', 'save', 'finish', or 'quit'.")
if __name__ == "__main__":
graph_maker = GraphMaker(port="/dev/tty.usbmodem14101", testing=False)
graph_maker.run()
import time
import numpy as np
import serial
from scipy.optimize import least_squares
# Define test values at the top of the file for easy modification
TEST_MEASURED_DISTANCES = [302, 463, 286, 304, 418, 328]
TEST_TAG1_DISTANCES = [22, 107, 246, 295]
TEST_TAG2_DISTANCES = [100, 100, 100, 100]
class SerialBuffer:
def __init__(self, port):
self.ser = serial.Serial(port, 115200, timeout=1)
def readFromDevice(self, expectedLines=1):
lines = []
# Attempt to read expected lines; if device is slow, you may need retries
while len(lines) < expectedLines:
if self.ser.in_waiting:
line = self.ser.readline().decode().strip()
if line:
lines.append(line)
else:
time.sleep(0.01)
return lines
def getBeaconPositioningDistances(self):
"""Reads the measured distances (A,E,D,B,F,C) from the device."""
self.writeToDevice("bpm")
buffer = self.readFromDevice(1)[0]
values = list(map(float, buffer.split(" ")))
return values
def getRangingDistances(self):
"""Reads the distances from the tags to the anchors."""
self.writeToDevice("rng")
lines = self.readFromDevice(2)
print(lines)
distances = []
# First line: Tag 1 distances
distances.append(list(map(float, lines[0][1:].split(" "))))
# Second line: Tag 2 distances or "0"
if lines[1] != "1":
distances.append(list(map(float, lines[1][1:].split(" "))))
else:
distances.append(None)
return distances
def writeToDevice(self, value):
self.ser.write((value + "\n").encode())
def __del__(self):
print("Closing port")
self.ser.close()
class AnchorTagCLI:
def __init__(self, port="/dev/tty.usbmodem14101", testing=False):
self.testing = testing
self.serial_buffer = SerialBuffer(port)
# Distances between anchors (A,E,D,B,F,C)
# Corresponding to the measured_distances in original code.
self.measured_distances = [0.0]*6
# Distances from Tag 1 to anchors: A1, A2, A3, A4
self.tag_distances = {"A1": 0.0, "A2": 0.0, "A3": 0.0, "A4": 0.0}
# Distances from Tag 2 to anchors: A1, A2, A3, A4
self.tag2_distances = {"A1": 0.0, "A2": 0.0, "A3": 0.0, "A4": 0.0}
self.anchors = {}
self.anchors_coords_known = False
if self.testing:
# Set predefined test distances
for i, dist in enumerate(TEST_MEASURED_DISTANCES):
self.measured_distances[i] = dist
for (anchor, dist) in zip(self.tag_distances.keys(), TEST_TAG1_DISTANCES):
self.tag_distances[anchor] = dist
for (anchor, dist) in zip(self.tag2_distances.keys(), TEST_TAG2_DISTANCES):
self.tag2_distances[anchor] = dist
def determine_anchor_coords(self):
try:
measured_distances = np.array(self.measured_distances)
noise_level = 0.0
measured_distances_noisy = measured_distances + np.random.uniform(-noise_level, noise_level, size=len(measured_distances))
# Variables: x_B, y_B, x_C, y_C, y_A
initial_guess = [120, 100, 150, 200, 50] # [x_B, y_B, x_C, y_C, y_A]
maxBounds = 30000
bounds = ([0, 0, 0, 0, 0], [maxBounds] * 5)
def error_function(variables, measured):
x_B, y_B, x_C, y_C, y_A = variables
# measured: [A, E, D, B, F, C]
a_measured = measured[0]
e_measured = measured[1]
d_measured = measured[2]
b_measured = measured[3]
f_measured = measured[4]
c_measured = measured[5]
# A=(0,y_A), B=(x_B,y_B), C=(x_C,y_C), D=(0,0)
a_calc = np.sqrt((x_B - 0)**2 + (y_B - y_A)**2) # A-B
b_calc = np.sqrt((x_C - x_B)**2 + (y_C - y_B)**2) # B-C
c_calc = np.sqrt(x_C**2 + y_C**2) # C-D
d_calc = y_A # A-D
e_calc = np.sqrt(x_C**2 + (y_C - y_A)**2) # A-C
f_calc = np.sqrt(x_B**2 + y_B**2) # B-D
return [
a_calc - a_measured,
b_calc - b_measured,
c_calc - c_measured,
d_calc - d_measured,
e_calc - e_measured,
f_calc - f_measured
]
# Run least squares optimization
result_noisy = least_squares(
error_function,
initial_guess,
args=(measured_distances_noisy,),
bounds=bounds,
loss='soft_l1'
)
optimized_coords_noisy = result_noisy.x
self.anchors = {
"A1": (0, optimized_coords_noisy[4]),
"A2": (optimized_coords_noisy[0], optimized_coords_noisy[1]),
"A3": (optimized_coords_noisy[2], optimized_coords_noisy[3]),
"A4": (0, 0),
}
return {k: (round(v[0], 2), round(v[1], 2)) for k, v in self.anchors.items()}
except Exception as e:
print(f"Error generating anchors: {e}")
def calculate_tag_coordinates(self, tag_distances):
if not self.anchors_coords_known or len(self.anchors) < 3:
return None, None
available_beacons = []
available_distances = []
for key in tag_distances.keys():
d = max(float(tag_distances[key]), 0)
available_distances.append(d)
available_beacons.append(self.anchors[key])
if len(available_beacons) < 3:
return None, None
def error_function(vars):
x, y = vars
residuals = []
for (bx, by), d_measured in zip(available_beacons, available_distances):
d_computed = np.sqrt((x - bx)**2 + (y - by)**2)
residuals.append(d_computed - d_measured)
return residuals
beacon_xs = [b[0] for b in available_beacons]
beacon_ys = [b[1] for b in available_beacons]
initial_guess = [np.mean(beacon_xs), np.mean(beacon_ys)]
x_min = min(beacon_xs) - 100
x_max = max(beacon_xs) + 100
y_min = min(beacon_ys) - 100
y_max = max(beacon_ys) + 100
bounds = ([x_min, y_min], [x_max, y_max])
result = least_squares(error_function, initial_guess, bounds=bounds, loss='soft_l1')
x_tag, y_tag = result.x
return x_tag, y_tag
def call_bpm(self):
if self.testing:
beacon_distances = TEST_MEASURED_DISTANCES
else:
beacon_distances = self.serial_buffer.getBeaconPositioningDistances()
for i, distance in enumerate(beacon_distances):
if i < len(self.measured_distances):
self.measured_distances[i] = distance
determined_anchor_coords = self.determine_anchor_coords()
if determined_anchor_coords:
self.anchors_coords_known = True
print("Anchor coordinates determined:")
for anchor, coords in determined_anchor_coords.items():
print(f"{anchor}: {coords}")
def update_distances_and_calculate_tags(self):
if not self.anchors_coords_known:
return
if self.testing:
ranging_distances = [TEST_TAG1_DISTANCES, TEST_TAG2_DISTANCES]
else:
ranging_distances = self.serial_buffer.getRangingDistances()
# Update tag 1 distances
if ranging_distances[0] is not None:
for i, distance in enumerate(ranging_distances[0]):
anchor = list(self.tag_distances.keys())[i]
self.tag_distances[anchor] = distance
# Update tag 2 distances
if ranging_distances[1] is not None:
for i, distance in enumerate(ranging_distances[1]):
anchor = list(self.tag2_distances.keys())[i]
self.tag2_distances[anchor] = distance
# Now calculate both tags
tag1_x, tag1_y = self.calculate_tag_coordinates(self.tag_distances)
valid_tag2_distances = [dist for dist in self.tag2_distances.values() if dist > 0]
# Check if there are enough valid distances for Tag 2
if len(valid_tag2_distances) < 3:
print(f"Insufficient valid distances for Tag 2: {len(valid_tag2_distances)} provided.")
tag2_x, tag2_y = None, None
else:
tag2_x, tag2_y = self.calculate_tag_coordinates(self.tag2_distances)
print("Tag Positions:")
if tag1_x is not None and tag1_y is not None:
print(f"Tag 1: ({tag1_x:.2f}, {tag1_y:.2f})")
else:
print("Tag 1: Not enough data")
if tag2_x is not None and tag2_y is not None:
print(f"Tag 2: ({tag2_x:.2f}, {tag2_y:.2f})")
else:
print("Tag 2: Not enough data")
if tag1_x is not None and tag1_y is not None and tag2_x is not None and tag2_y is not None:
dx = tag2_x - tag1_x
dy = tag2_y - tag1_y
displacement = np.sqrt(dx**2 + dy**2)
angle_deg = np.degrees(np.arctan2(dy, dx))
if angle_deg < 0:
angle_deg += 360
print(f"Direction from Tag1 to Tag2: dx={dx:.2f}, dy={dy:.2f}, displacement={displacement:.2f}, angle={angle_deg:.2f}°")
return tag1_x, tag1_y
if __name__ == "__main__":
app = AnchorTagCLI(port="/dev/tty.usbmodem14101", testing=False)
while True:
user_input = input("Enter command ('bpm' to set anchors, or 'quit' to exit): ").strip().lower()
if user_input == "bpm":
app.call_bpm()
print("Switching to continuous ranging updates (simulating 'rng' messages)...")
while True:
app.update_distances_and_calculate_tags()
time.sleep(1)
elif user_input == "quit":
print("Exiting program.")
break
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment