diff --git a/Wireless_Communication/UWB/Beacons_tag_position/beacons_D_origin_modified.py b/Wireless_Communication/UWB/Beacons_tag_position/beacons_D_origin_modified.py index eda0501ddfcd1f0cb2bee8fd56f1bc3584433631..46b61848deb5d03e2be30e647bba847df3de9aa9 100644 --- a/Wireless_Communication/UWB/Beacons_tag_position/beacons_D_origin_modified.py +++ b/Wireless_Communication/UWB/Beacons_tag_position/beacons_D_origin_modified.py @@ -4,10 +4,6 @@ import numpy as np # Measured distances arrive in order: A, E, D, B, F, C measured_distances = [1496.692877, 1477.462413, 677.287532, 947.921123, 1358.796385, 922.593196] -# Introduce ±10 cm of noise into the measurements -noise_level = 0 # Set to zero for no noise -measured_distances_noisy = measured_distances + np.random.uniform(-noise_level, noise_level, size=len(measured_distances)) - # Automatically generate a robust initial guess def generate_initial_guess(measured_distances): y_A = measured_distances[2] # Distance from A to D @@ -73,27 +69,27 @@ def error_function(variables, measured): return [r_a, r_b, r_c, r_d, r_e, r_f, penalty] # Generate the initial guess and bounds -initial_guess = generate_initial_guess(measured_distances_noisy) -lower_bounds, upper_bounds = generate_bounds(measured_distances_noisy) +initial_guess = generate_initial_guess(measured_distances) +lower_bounds, upper_bounds = generate_bounds(measured_distances) print("Lower bounds:", lower_bounds) print("Upper bounds:", upper_bounds) print("Initial guess:", initial_guess) # Run least squares optimization -result_noisy = least_squares( +result = least_squares( error_function, initial_guess, - args=(measured_distances_noisy,), + args=(measured_distances,), bounds=(lower_bounds, upper_bounds), loss='soft_l1' ) # Extract optimized coordinates -optimized_coords_noisy = result_noisy.x -x_B, y_B, x_C, y_C, y_A = optimized_coords_noisy -print("Optimized coordinates with noise:", optimized_coords_noisy) +optimized_coords = result.x +x_B, y_B, x_C, y_C, y_A = optimized_coords +print("Optimized coordinates with noise:", optimized_coords) # Calculate and print residuals -residuals_noisy = error_function(optimized_coords_noisy, measured_distances_noisy)[:-1] # Ignore penalty -print("\nResiduals with noisy measurements:", residuals_noisy) +residuals = error_function(optimized_coords, measured_distances)[:-1] # Ignore penalty +print("\nResiduals with noisy measurements:", residuals) diff --git a/ros2/src/robobin/robobin/uwb_navigation_node.py b/ros2/src/robobin/robobin/uwb_navigation_node.py index 2ce81956db0f2383a13367cf874c56c9a916b075..e9bf547fa80bbf65d26590233256a25405557698 100644 --- a/ros2/src/robobin/robobin/uwb_navigation_node.py +++ b/ros2/src/robobin/robobin/uwb_navigation_node.py @@ -1,6 +1,5 @@ import json import os -import time import numpy as np from scipy.optimize import least_squares from geometry_msgs.msg import Point, Twist @@ -11,54 +10,93 @@ from sensor_msgs.msg import Imu # from transforms3d.euler import quat2euler import math -class SerialBuffer: - def __init__(self, port): - import serial - self.ser = serial.Serial(port, 115200, timeout=1) - self.flip = False - def readFromDevice(self, expectedLines=1): - lines = [] - while len(lines) < expectedLines: - if self.ser.in_waiting: - line = self.ser.readline().decode().strip() - if line: - lines.append(line) - else: - time.sleep(0.01) - return lines +class SerialController: + def __init__(self, port, windowSize = 10, minSuccessfulValues = 5, maxRetries = 20): + self.ser = serial.Serial(port, 115200, timeout = 1) + self.windowSize = windowSize + self.minSuccessfulValues = minSuccessfulValues + self.maxRetries = maxRetries + + self.tag1Window = [[] for _ in range(4)] + self.tag2Window = [[] for _ in range(4)] + def readFromDevice(self, expectedLines = 1): + lines = [] + while self.ser.in_waiting or len(lines) < expectedLines: + lines.append(self.ser.readline().decode()) + return list(map(lambda line: line.strip(), lines)) + + def getBPMOnce(self): + self.writeToDevice("bpm") + buffer = self.readFromDevice(1)[0].split(" ") + print(buffer) + if len(buffer) == 6: #received all bpm distances + return list(map(float, buffer)) + elif len(buffer) == 4: #live check saying which anchors are unreachable + return list(map(lambda x: bool(int(x)), buffer)) + else: + return [] + # return int(buffer[0]) if len(buffer) else None + def getBeaconPositioningDistances(self): - # Mocked data for now + bpmWindow = [] + successfulValues = 0 + currentTry = 0 + anchorsOnline = [] + while (successfulValues < self.minSuccessfulValues) and (currentTry < self.maxRetries): + values = self.getBPMOnce() + if len(values) == 4: #bpm returned live check, some anchors are offline + anchorsOnline = values + if len(values) == 6: #bpm distances retrieved + if all(map(lambda x: x >= 0.0, values)): #ensure all values are non-negative + bpmWindow.append(values) + successfulValues += 1 + currentTry += 1 - # return [302, 463, 286, 304, 418, 328] - self.writeToDevice("bpm") - buffer = self.readFromDevice(1)[0] - values = list(map(float, buffer.split(" "))) - return values - - def getRangingDistances(self): - # Mocked data for now - # Tag1 distances - # Tag2 distances - # self.flip = not self.flip - # if self.flip: - # return [[100, 200, 300, 400], [200, 300, 400, 500]] - # else: - # return [[200, 300, 400, 500], [100, 200, 300, 400]] + if currentTry >= self.maxRetries: + return anchorsOnline + else: + return [sum(map(lambda bpmEntry: bpmEntry[i], bpmWindow)) / len(bpmWindow) for i in range(6)] + + def getRNGOnce(self): self.writeToDevice("rng") lines = self.readFromDevice(2) distances = [] - distances.append(list(map(float, lines[0][1:].split(" ")))) - if lines[1] != "1": - distances.append(list(map(float, lines[1][1:].split(" ")))) + distances.append(list(map(float, lines[0].split(" ")))) + if len(lines[1]) > 1: + distances.append(list(map(float, lines[1].split(" ")))) else: distances.append(None) return distances + + def getRangingDistances(self): + tag1Distances, tag2Distances = self.getRNGOnce() + if (all(map(lambda x: x >= 0.0, tag1Distances))): #all values are non-negative + if len(self.tag1Window[0]) < self.windowSize: + for i, dist in enumerate(tag1Distances): + (self.tag1Window[i]).append(dist) + else: + for i, dist in enumerate(tag1Distances): + self.tag1Window[i] = self.tag1Window[i][1:] + [dist] + if tag2Distances == None: + self.tag2Window = [[] for _ in range(4)] + else: + if (all(map(lambda x: x >= 0.0, tag2Distances))): #all values are non-negative + if len(self.tag2Window[0]) < self.windowSize: + for i, dist in enumerate(tag2Distances): + self.tag2Window[i].append(dist) + else: + for i, dist in enumerate(tag2Distances): + self.tag2Window[i] = self.tag2Window[i][1:] + [dist] + + return [[(sum(distArray) / (len(distArray) - distArray.count(0.0)) if (len(distArray) - distArray.count(0.0)) else 0.0) for distArray in self.tag1Window], [(sum(distArray) / (len(distArray) - distArray.count(0.0)) if (len(distArray) - distArray.count(0.0)) else 0.0) for distArray in self.tag2Window] if len(self.tag2Window[0]) else None] + def writeToDevice(self, value): self.ser.write((value + "\n").encode()) - + def __del__(self): + print("Closing port") self.ser.close() class UWBNode(Node): @@ -78,8 +116,9 @@ class UWBNode(Node): self.current_yaw = 0.0 # Store the current IMU yaw self.uwb_to_imu_offset = 0.0 # Offset between UWB and IMU heading self.mode = "Manual" - self.serial_buffer = SerialBuffer("/dev/ttyACM0") + self.serial_controller = SerialController("/dev/ttyACM0") self.anchors = {} + self.anchorHeight = 250 self.anchors_coords_known = False self.previous_tag1_position = None @@ -98,7 +137,7 @@ class UWBNode(Node): self.current_target_index = 0 # Index in the path self.call_mode_active = False workspace_root = os.getcwd() # Current working directory - anchors_file_path = os.path.abspath(os.path.join(workspace_root,"src", "robobin", "robobin", "graphs", "anchors.json")) + anchors_file_path = os.path.abspath(os.path.join(workspace_root, "src", "robobin", "robobin", "graphs", "anchors.json")) if os.path.exists(anchors_file_path): try: with open(anchors_file_path, 'r') as f: @@ -119,7 +158,7 @@ class UWBNode(Node): def call_bpm(self): try: # Retrieve beacon distances and determine anchor coordinates - beacon_distances = self.serial_buffer.getBeaconPositioningDistances() + beacon_distances = self.serial_controller.getBeaconPositioningDistances() self.get_logger().info(f"Retrieved values {beacon_distances}") self.determine_anchor_coords(beacon_distances) self.anchors_coords_known = True @@ -163,7 +202,6 @@ class UWBNode(Node): except Exception as e: self.get_logger().error(f"Failed to determine anchors: {e}") - def handle_imu(self, msg): # Extract yaw from quaternion using transforms3d orientation_q = msg.orientation @@ -174,42 +212,58 @@ class UWBNode(Node): # self.get_logger().info(f"IMU Yaw: {yaw:.2f} radians") def determine_anchor_coords(self, measured_distances): - measured_distances = np.array(measured_distances) + y_A = measured_distances[2] # Distance from A to D + + # Guess for B based on distance A-B and B-D + x_B = measured_distances[0] / 2 + y_B = measured_distances[4] / 2 + 200 # Start y_B above y_C + + # Guess for C with symmetrical logic + x_C = -measured_distances[5] / 2 # Allow for negative x_C + y_C = -measured_distances[1] / 2 # Allow for negative y_C - initial_guess = [120, 100, 150, 200, 50] - bounds = ([0, 0, 0, 0, 0], [1000, 1000, 1000, 1000, 1000]) + initial_guess = [x_B, y_B, x_C, y_C, y_A] + + min_dist = min(measured_distances) + max_dist = max(measured_distances) + lower_bounds = [-max_dist, -max_dist, -max_dist, -max_dist, min_dist / 2] + upper_bounds = [max_dist * 1.5 for i in range(5)] def error_function(variables, measured): x_B, y_B, x_C, y_C, y_A = variables - a_calc = np.sqrt((x_B - 0)**2 + (y_B - y_A)**2) - b_calc = np.sqrt((x_C - x_B)**2 + (y_C - y_B)**2) - c_calc = np.sqrt(x_C**2 + y_C**2) - d_calc = y_A - e_calc = np.sqrt(x_C**2 + (y_C - y_A)**2) - f_calc = np.sqrt(x_B**2 + y_B**2) - - return [ - a_calc - measured[0], - b_calc - measured[3], - c_calc - measured[5], - d_calc - measured[2], - e_calc - measured[1], - f_calc - measured[4] - ] - - result = least_squares( - error_function, - initial_guess, - args=(measured_distances,), - bounds=bounds, - loss='soft_l1' - ) + + # Map measured distances to a, e, d, b, f, c + a_measured, e_measured, d_measured, b_measured, f_measured, c_measured = measured + + # Compute each distance + a_calc = np.sqrt((x_B - 0)**2 + (y_B - y_A)**2) # A-B + b_calc = np.sqrt((x_C - x_B)**2 + (y_C - y_B)**2) # B-C + c_calc = np.sqrt(x_C**2 + y_C**2) # C-D + d_calc = y_A # A-D + e_calc = np.sqrt(x_C**2 + (y_C - y_A)**2) # A-C + f_calc = np.sqrt(x_B**2 + y_B**2) # B-D + + # Residuals + r_a = a_calc - a_measured + r_b = b_calc - b_measured + r_c = c_calc - c_measured + r_d = d_calc - d_measured + r_e = e_calc - e_measured + r_f = f_calc - f_measured + + # Add a smoother penalty if y_B <= y_C + penalty = 1e3 * max(0, y_C - y_B + 10) # Soft penalty to enforce constraint + + return [r_a, r_b, r_c, r_d, r_e, r_f, penalty] + + result = least_squares(error_function, initial_guess, args=(measured_distances,), bounds=(lower_bounds, upper_bounds), loss='soft_l1') + x_B, y_B, x_C, y_C, y_A = result.x self.anchors = { - "A1": (0, y_A), - "A2": (x_B, y_B), - "A3": (x_C, y_C), - "A4": (0, 0) + "A1": (0, y_A, self.anchorHeight), + "A2": (x_B, y_B, self.anchorHeight), + "A3": (x_C, y_C, self.anchorHeight), + "A4": (0, 0, self.anchorHeight) } def calculate_tag_coordinates(self, tag_distances): @@ -227,6 +281,27 @@ class UWBNode(Node): if len(available_beacons) < 3: return None + + heights = [] + for (bx, by, bz), d_measured in zip(available_beacons, available_distances): + horizontal_distance = np.sqrt((bx - 0)**2 + (by - 0)**2) # Assume tag starts at (0, 0) + estimated_z = np.sqrt(max(0, d_measured**2 - horizontal_distance**2)) # Ensure non-negative + heights.append(bz - estimated_z) # Estimate tag height relative to beacon + + initial_z = max(0, min(self.anchorHeight, np.mean(heights))) # Constrain height to [0, 200] + + beacon_xs = [b[0] for b in available_beacons] + beacon_ys = [b[1] for b in available_beacons] + initial_guess = [np.mean(beacon_xs), np.mean(beacon_ys), initial_z] + + # Define bounds for the tag position + x_min = min(beacon_xs) - 1000 # Add a small margin around the beacons + x_max = max(beacon_xs) + 1000 + y_min = min(beacon_ys) - 1000 + y_max = max(beacon_ys) + 1000 + z_min = 0.0 # Tag's height cannot be below the ground + z_max = self.anchorHeight # Tag's height cannot exceed the beacon height + bounds = ([x_min, y_min, z_min], [x_max, y_max, z_max]) def error_function(vars): x, y = vars @@ -236,15 +311,6 @@ class UWBNode(Node): residuals.append(d_computed - d_measured) return residuals - beacon_xs = [b[0] for b in available_beacons] - beacon_ys = [b[1] for b in available_beacons] - initial_guess = [np.mean(beacon_xs), np.mean(beacon_ys)] - - bounds = ( - [min(beacon_xs) - 100, min(beacon_ys) - 100], - [max(beacon_xs) + 100, max(beacon_ys) + 100] - ) - result = least_squares(error_function, initial_guess, bounds=bounds, loss='soft_l1') # self.get_logger().info(f"Optimization result: {result.x}") return tuple(result.x) @@ -253,7 +319,7 @@ class UWBNode(Node): self.get_logger().info("Updating positions...") if self.anchors_coords_known: try: - ranging_distances = self.serial_buffer.getRangingDistances() + ranging_distances = self.serial_controller.getRangingDistances() # self.get_logger().info(f"Ranging distances: {ranging_distances}") self.get_logger().warning("Ranging distances: " + str(ranging_distances)) tag1_distances = ranging_distances[0] @@ -265,10 +331,14 @@ class UWBNode(Node): } tag1_position = self.calculate_tag_coordinates(tag_distances_dict) + + + + if tag1_position is not None: self.current_tag1_position = tag1_position self.get_logger().info(f"Tag 1 position: {tag1_position}") - position_msg = Point(x=tag1_position[0], y=tag1_position[1], z=0.0) + position_msg = Point(x=tag1_position[0], y=tag1_position[1], z=tag1_position[2]) self.publisher.publish(position_msg) if self.mode == "Follow": @@ -286,7 +356,7 @@ class UWBNode(Node): self.get_logger().info(f"Tag 1 position: {tag1_position}") self.get_logger().info(f"Tag 2 position: {tag2_position}") self.current_tag2_position = tag2_position - target_msg = Point(x=tag2_position[0], y=tag2_position[1], z=0.0) + target_msg = Point(x=tag2_position[0], y=tag2_position[1], z=tag2_position[2]) self.target_location_pub.publish(target_msg) # Compute offset if not yet computed @@ -310,7 +380,6 @@ class UWBNode(Node): self.start_publisher.publish(position_msg) self.get_logger().info(f"Current position: {self.current_tag1_position} Success!") - # Publish target position target_msg = Point(x=target_pos[0], y=target_pos[1], z=0.0) self.target_location_pub.publish(target_msg)