diff --git a/ros2/src/robobin/robobin/helpers/__pycache__/realtime_location_cli_only.cpython-312.pyc b/ros2/src/robobin/robobin/helpers/__pycache__/realtime_location_cli_only.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..c9190690f3276b2c8cc4075a9a6b33e305014630 Binary files /dev/null and b/ros2/src/robobin/robobin/helpers/__pycache__/realtime_location_cli_only.cpython-312.pyc differ diff --git a/ros2/src/robobin/robobin/helpers/graph_maker.py b/ros2/src/robobin/robobin/helpers/graph_maker.py new file mode 100644 index 0000000000000000000000000000000000000000..c197c10b7eea1a63721b6cf40554a57b79c13d75 --- /dev/null +++ b/ros2/src/robobin/robobin/helpers/graph_maker.py @@ -0,0 +1,112 @@ +import json +import time +import os +from realtime_location_cli_only import AnchorTagCLI + +class GraphMaker: + def __init__(self, port="COM11", testing=False): + self.app = AnchorTagCLI(port=port, testing=testing) + self.graph = { + "name": "Lab 1 Extended", + "nodes": [], + "connections": [] + } + self.previous_node_index = None + + def call_bpm_and_store_anchors(self): + self.app.call_bpm() + for name, (x, y) in self.app.anchors.items(): + self.graph["nodes"].append({ + "name": name, + "x": x, + "y": y + }) + + # Initialize connections matrix with False + num_nodes = len(self.graph["nodes"]) + self.graph["connections"] = [[False] * num_nodes for _ in range(num_nodes)] + + print("Anchor coordinates stored successfully.") + + def start_session(self): + print("Waiting for 'start' command...") + while True: + user_input = input("Enter 'start' to begin adding nodes or 'quit' to exit: ").strip().lower() + if user_input == "start": + print("Session started. Type 'save' to add nodes, 'finish' to save the graph, or 'quit' to exit.") + break + elif user_input == "quit": + print("Exiting...") + exit() + + def add_node_and_connect(self): + print(self.app.tag_distances) + distances = self.app.serial_buffer.getRangingDistances() + + tag1_x, tag1_y = self.app.update_distances_and_calculate_tags() + print(f"{tag1_x}, {tag1_y}") + if tag1_x is None or tag1_y is None: + print("Failed to determine tag position. Skipping node creation.") + return + + new_node_index = len(self.graph["nodes"]) + + # Add new node + self.graph["nodes"].append({ + "name": f"Node{new_node_index+1}", + "x": tag1_x, + "y": tag1_y + }) + + # Dynamically resize the connections matrix + for row in self.graph["connections"]: + row.append(False) # Add a column for the new node + self.graph["connections"].append([False] * (new_node_index + 1)) # Add a new row + + # Update connections + if self.previous_node_index is not None: + self.graph["connections"][self.previous_node_index][new_node_index] = True + self.graph["connections"][new_node_index][self.previous_node_index] = True + + self.previous_node_index = new_node_index + + print(f"Node {new_node_index+1} added at ({tag1_x:.2f}, {tag1_y:.2f}).") + + def save_graph(self): + directory = r"D:\Github\robobin\ros2\src\robobin\robobin\graphs" #Replace with whereever your graphs folder is in the ros2 workspace + os.makedirs(directory, exist_ok=True) # Create the directory if it doesn't exist + file_path = os.path.join(directory, "graph.json") + with open(file_path, "w") as f: + json.dump(self.graph, f, indent=2) + print(f"Graph saved to '{file_path}'.") + + + def run(self): + while True: + user_input = input("Enter command ('bpm', 'start', 'save', 'finish', 'quit'): ").strip().lower() + if user_input == "bpm": + self.call_bpm_and_store_anchors() + elif user_input == "start": + self.start_session() + while True: + user_input = input("Enter 'save' to add a node, 'finish' to save and exit, or 'quit' to exit: ").strip().lower() + if user_input == "save": + self.add_node_and_connect() + elif user_input == "finish": + self.save_graph() + print("Session finished.") + exit() + elif user_input == "quit": + print("Exiting without saving.") + exit() + else: + print("Invalid command. Try 'save', 'finish', or 'quit'.") + elif user_input == "quit": + print("Exiting...") + break + else: + print("Invalid command. Try 'bpm', 'start', 'save', 'finish', or 'quit'.") + +if __name__ == "__main__": + graph_maker = GraphMaker(port="/dev/tty.usbmodem14101", testing=False) + graph_maker.run() diff --git a/ros2/src/robobin/robobin/helpers/realtime_location_cli_only.py b/ros2/src/robobin/robobin/helpers/realtime_location_cli_only.py new file mode 100644 index 0000000000000000000000000000000000000000..a314cec3db27597e1303cf8ca2d0a00523600dbe --- /dev/null +++ b/ros2/src/robobin/robobin/helpers/realtime_location_cli_only.py @@ -0,0 +1,263 @@ +import time +import numpy as np +import serial +from scipy.optimize import least_squares + +# Define test values at the top of the file for easy modification +TEST_MEASURED_DISTANCES = [302, 463, 286, 304, 418, 328] +TEST_TAG1_DISTANCES = [22, 107, 246, 295] +TEST_TAG2_DISTANCES = [100, 100, 100, 100] + +class SerialBuffer: + def __init__(self, port): + self.ser = serial.Serial(port, 115200, timeout=1) + + def readFromDevice(self, expectedLines=1): + lines = [] + # Attempt to read expected lines; if device is slow, you may need retries + while len(lines) < expectedLines: + if self.ser.in_waiting: + line = self.ser.readline().decode().strip() + if line: + lines.append(line) + else: + time.sleep(0.01) + return lines + + def getBeaconPositioningDistances(self): + """Reads the measured distances (A,E,D,B,F,C) from the device.""" + self.writeToDevice("bpm") + buffer = self.readFromDevice(1)[0] + values = list(map(float, buffer.split(" "))) + return values + + def getRangingDistances(self): + """Reads the distances from the tags to the anchors.""" + self.writeToDevice("rng") + lines = self.readFromDevice(2) + print(lines) + distances = [] + # First line: Tag 1 distances + distances.append(list(map(float, lines[0][1:].split(" ")))) + # Second line: Tag 2 distances or "0" + if lines[1] != "1": + distances.append(list(map(float, lines[1][1:].split(" ")))) + else: + distances.append(None) + return distances + + def writeToDevice(self, value): + self.ser.write((value + "\n").encode()) + + def __del__(self): + print("Closing port") + self.ser.close() + + +class AnchorTagCLI: + def __init__(self, port="/dev/tty.usbmodem14101", testing=False): + self.testing = testing + self.serial_buffer = SerialBuffer(port) + + # Distances between anchors (A,E,D,B,F,C) + # Corresponding to the measured_distances in original code. + self.measured_distances = [0.0]*6 + + # Distances from Tag 1 to anchors: A1, A2, A3, A4 + self.tag_distances = {"A1": 0.0, "A2": 0.0, "A3": 0.0, "A4": 0.0} + + # Distances from Tag 2 to anchors: A1, A2, A3, A4 + self.tag2_distances = {"A1": 0.0, "A2": 0.0, "A3": 0.0, "A4": 0.0} + + self.anchors = {} + self.anchors_coords_known = False + + if self.testing: + # Set predefined test distances + for i, dist in enumerate(TEST_MEASURED_DISTANCES): + self.measured_distances[i] = dist + + for (anchor, dist) in zip(self.tag_distances.keys(), TEST_TAG1_DISTANCES): + self.tag_distances[anchor] = dist + + for (anchor, dist) in zip(self.tag2_distances.keys(), TEST_TAG2_DISTANCES): + self.tag2_distances[anchor] = dist + + def determine_anchor_coords(self): + try: + measured_distances = np.array(self.measured_distances) + noise_level = 0.0 + measured_distances_noisy = measured_distances + np.random.uniform(-noise_level, noise_level, size=len(measured_distances)) + + # Variables: x_B, y_B, x_C, y_C, y_A + initial_guess = [120, 100, 150, 200, 50] # [x_B, y_B, x_C, y_C, y_A] + maxBounds = 30000 + bounds = ([0, 0, 0, 0, 0], [maxBounds] * 5) + + def error_function(variables, measured): + x_B, y_B, x_C, y_C, y_A = variables + # measured: [A, E, D, B, F, C] + a_measured = measured[0] + e_measured = measured[1] + d_measured = measured[2] + b_measured = measured[3] + f_measured = measured[4] + c_measured = measured[5] + + # A=(0,y_A), B=(x_B,y_B), C=(x_C,y_C), D=(0,0) + a_calc = np.sqrt((x_B - 0)**2 + (y_B - y_A)**2) # A-B + b_calc = np.sqrt((x_C - x_B)**2 + (y_C - y_B)**2) # B-C + c_calc = np.sqrt(x_C**2 + y_C**2) # C-D + d_calc = y_A # A-D + e_calc = np.sqrt(x_C**2 + (y_C - y_A)**2) # A-C + f_calc = np.sqrt(x_B**2 + y_B**2) # B-D + + return [ + a_calc - a_measured, + b_calc - b_measured, + c_calc - c_measured, + d_calc - d_measured, + e_calc - e_measured, + f_calc - f_measured + ] + + # Run least squares optimization + result_noisy = least_squares( + error_function, + initial_guess, + args=(measured_distances_noisy,), + bounds=bounds, + loss='soft_l1' + ) + optimized_coords_noisy = result_noisy.x + + self.anchors = { + "A1": (0, optimized_coords_noisy[4]), + "A2": (optimized_coords_noisy[0], optimized_coords_noisy[1]), + "A3": (optimized_coords_noisy[2], optimized_coords_noisy[3]), + "A4": (0, 0), + } + return {k: (round(v[0], 2), round(v[1], 2)) for k, v in self.anchors.items()} + except Exception as e: + print(f"Error generating anchors: {e}") + + def calculate_tag_coordinates(self, tag_distances): + if not self.anchors_coords_known or len(self.anchors) < 3: + return None, None + + available_beacons = [] + available_distances = [] + for key in tag_distances.keys(): + d = max(float(tag_distances[key]), 0) + available_distances.append(d) + available_beacons.append(self.anchors[key]) + + if len(available_beacons) < 3: + return None, None + + def error_function(vars): + x, y = vars + residuals = [] + for (bx, by), d_measured in zip(available_beacons, available_distances): + d_computed = np.sqrt((x - bx)**2 + (y - by)**2) + residuals.append(d_computed - d_measured) + return residuals + + beacon_xs = [b[0] for b in available_beacons] + beacon_ys = [b[1] for b in available_beacons] + initial_guess = [np.mean(beacon_xs), np.mean(beacon_ys)] + + x_min = min(beacon_xs) - 100 + x_max = max(beacon_xs) + 100 + y_min = min(beacon_ys) - 100 + y_max = max(beacon_ys) + 100 + bounds = ([x_min, y_min], [x_max, y_max]) + + result = least_squares(error_function, initial_guess, bounds=bounds, loss='soft_l1') + x_tag, y_tag = result.x + return x_tag, y_tag + + def call_bpm(self): + if self.testing: + beacon_distances = TEST_MEASURED_DISTANCES + else: + beacon_distances = self.serial_buffer.getBeaconPositioningDistances() + + for i, distance in enumerate(beacon_distances): + if i < len(self.measured_distances): + self.measured_distances[i] = distance + + determined_anchor_coords = self.determine_anchor_coords() + if determined_anchor_coords: + self.anchors_coords_known = True + print("Anchor coordinates determined:") + for anchor, coords in determined_anchor_coords.items(): + print(f"{anchor}: {coords}") + + def update_distances_and_calculate_tags(self): + if not self.anchors_coords_known: + return + + if self.testing: + ranging_distances = [TEST_TAG1_DISTANCES, TEST_TAG2_DISTANCES] + else: + ranging_distances = self.serial_buffer.getRangingDistances() + + # Update tag 1 distances + if ranging_distances[0] is not None: + for i, distance in enumerate(ranging_distances[0]): + anchor = list(self.tag_distances.keys())[i] + self.tag_distances[anchor] = distance + + # Update tag 2 distances + if ranging_distances[1] is not None: + for i, distance in enumerate(ranging_distances[1]): + anchor = list(self.tag2_distances.keys())[i] + self.tag2_distances[anchor] = distance + + # Now calculate both tags + tag1_x, tag1_y = self.calculate_tag_coordinates(self.tag_distances) + valid_tag2_distances = [dist for dist in self.tag2_distances.values() if dist > 0] + + # Check if there are enough valid distances for Tag 2 + if len(valid_tag2_distances) < 3: + print(f"Insufficient valid distances for Tag 2: {len(valid_tag2_distances)} provided.") + tag2_x, tag2_y = None, None + else: + tag2_x, tag2_y = self.calculate_tag_coordinates(self.tag2_distances) + + print("Tag Positions:") + if tag1_x is not None and tag1_y is not None: + print(f"Tag 1: ({tag1_x:.2f}, {tag1_y:.2f})") + else: + print("Tag 1: Not enough data") + + if tag2_x is not None and tag2_y is not None: + print(f"Tag 2: ({tag2_x:.2f}, {tag2_y:.2f})") + else: + print("Tag 2: Not enough data") + + + if tag1_x is not None and tag1_y is not None and tag2_x is not None and tag2_y is not None: + dx = tag2_x - tag1_x + dy = tag2_y - tag1_y + displacement = np.sqrt(dx**2 + dy**2) + angle_deg = np.degrees(np.arctan2(dy, dx)) + if angle_deg < 0: + angle_deg += 360 + print(f"Direction from Tag1 to Tag2: dx={dx:.2f}, dy={dy:.2f}, displacement={displacement:.2f}, angle={angle_deg:.2f}°") + return tag1_x, tag1_y + +if __name__ == "__main__": + app = AnchorTagCLI(port="/dev/tty.usbmodem14101", testing=False) + while True: + user_input = input("Enter command ('bpm' to set anchors, or 'quit' to exit): ").strip().lower() + if user_input == "bpm": + app.call_bpm() + print("Switching to continuous ranging updates (simulating 'rng' messages)...") + while True: + app.update_distances_and_calculate_tags() + time.sleep(1) + elif user_input == "quit": + print("Exiting program.") + break \ No newline at end of file