diff --git a/ros2/src/robobin/launch/robobin_launch.py b/ros2/src/robobin/launch/robobin_launch.py index 53efd3a4b835767406daa482be80cea39858c823..50e80250ab233bc37ef956ae088274e6912ecc68 100755 --- a/ros2/src/robobin/launch/robobin_launch.py +++ b/ros2/src/robobin/launch/robobin_launch.py @@ -13,14 +13,6 @@ def generate_launch_description(): emulate_tty=True ), - - Node( - package='robobin', - executable='uwb_navigation_node', - name='uwb_navigation_node', - output='screen', - emulate_tty=True - ), Node( package='robobin', @@ -43,13 +35,7 @@ def generate_launch_description(): output='screen', emulate_tty=True ), - Node( - package='robobin', - executable='uwb_pathing_node', - name='uwb_pathing_node', - output='screen', - emulate_tty=True - ), + # Add additional nodes # Example: diff --git a/ros2/src/robobin/robobin/api_node.py b/ros2/src/robobin/robobin/api_node.py index 14063fc3b94f477445f6a081459d257a54f3bb56..2f05d1383f0a274db9a4acb2b07ff8bc21362cf9 100644 --- a/ros2/src/robobin/robobin/api_node.py +++ b/ros2/src/robobin/robobin/api_node.py @@ -107,6 +107,7 @@ class ApiNode(Node): if result is not None: topic, message = result # Safely unpack after checking if topic is not None: + self.get_logger().info(f"Received message: {message}") self.get_logger().info(f"Publishing to topic: {topic}") self.publish_to_topic(topic, message) except Exception as e: diff --git a/ros2/src/robobin/robobin/helpers/message_handler.py b/ros2/src/robobin/robobin/helpers/message_handler.py index 3182da3bd7e274b158b5f3003f77798ee748a734..0b61cab31a77cc5240dd366ffe8bf72746e8b728 100644 --- a/ros2/src/robobin/robobin/helpers/message_handler.py +++ b/ros2/src/robobin/robobin/helpers/message_handler.py @@ -16,8 +16,19 @@ class MessageHandler: "CALLME": self.handle_call_me, "BPM": self.handle_call_bpm, "REQMAP": self.handle_request_map, - "LOCATION": self.handle_request_location + "LOCATION": self.handle_request_location, + "CALLMELIDAR": self.handle_call_me_lidar } + def handle_call_me_lidar(self, client_socket, message): + #CALLMELIDAR 1 (for example) + response = b"Requested Number is " + message.encode() + if self.testing: + print(response.decode()) + else: + client_socket.sendall(response) + return "nav_command", "CALLMELIDAR " + message + + def handle_request_location(self, client_socket, message): """Handles the LOC command.""" response = f"location is {self.api_node.connection_manager.location}".encode() diff --git a/ros2/src/robobin/robobin/uwb_navigation_node.py b/ros2/src/robobin/robobin/uwb_navigation_node.py deleted file mode 100644 index 21767f41fc4ca24e576017fa7aaf1132242df8ef..0000000000000000000000000000000000000000 --- a/ros2/src/robobin/robobin/uwb_navigation_node.py +++ /dev/null @@ -1,677 +0,0 @@ -import json -import os -import numpy as np -from scipy.optimize import least_squares -from geometry_msgs.msg import Point, Twist -import rclpy -from rclpy.node import Node -from std_msgs.msg import String -from sensor_msgs.msg import Imu -from tf.transformations import euler_from_quaternion - -import math - -class SerialController: - def __init__(self, port, windowSize = 10, minSuccessfulValues = 5, maxRetries = 20): - self.ser = serial.Serial(port, 115200, timeout = 1) - self.windowSize = windowSize - self.minSuccessfulValues = minSuccessfulValues - self.maxRetries = maxRetries - self.calibration_active = False - self.calibration_timer = None - self.calib_start_position = None - self.calib_start_yaw = None - - - - self.tag1Window = [[] for _ in range(4)] - self.tag2Window = [[] for _ in range(4)] - - def readFromDevice(self, expectedLines = 1): - lines = [] - while self.ser.in_waiting or len(lines) < expectedLines: - lines.append(self.ser.readline().decode()) - return list(map(lambda line: line.strip(), lines)) - - def getBPMOnce(self): - self.writeToDevice("bpm") - buffer = self.readFromDevice(1)[0].split(" ") - print(buffer) - if len(buffer) == 6: #received all bpm distances - return list(map(float, buffer)) - elif len(buffer) == 4: #live check saying which anchors are unreachable - return list(map(lambda x: bool(int(x)), buffer)) - else: - return [] - # return int(buffer[0]) if len(buffer) else None - - def getBeaconPositioningDistances(self): - bpmWindow = [] - successfulValues = 0 - currentTry = 0 - anchorsOnline = [] - while (successfulValues < self.minSuccessfulValues) and (currentTry < self.maxRetries): - values = self.getBPMOnce() - if len(values) == 4: #bpm returned live check, some anchors are offline - anchorsOnline = values - if len(values) == 6: #bpm distances retrieved - if all(map(lambda x: x >= 0.0, values)): #ensure all values are non-negative - bpmWindow.append(values) - successfulValues += 1 - currentTry += 1 - - if currentTry >= self.maxRetries: - return anchorsOnline - else: - return [sum(map(lambda bpmEntry: bpmEntry[i], bpmWindow)) / len(bpmWindow) for i in range(6)] - - def getRNGOnce(self): - self.writeToDevice("rng") - lines = self.readFromDevice(2) - distances = [] - distances.append(list(map(float, lines[0].split(" ")))) - if len(lines[1]) > 1: - distances.append(list(map(float, lines[1].split(" ")))) - else: - distances.append(None) - return distances - - def getRangingDistances(self): - tag1Distances, tag2Distances = self.getRNGOnce() - if (all(map(lambda x: x >= 0.0, tag1Distances))): #all values are non-negative - if len(self.tag1Window[0]) < self.windowSize: - for i, dist in enumerate(tag1Distances): - (self.tag1Window[i]).append(dist) - else: - for i, dist in enumerate(tag1Distances): - self.tag1Window[i] = self.tag1Window[i][1:] + [dist] - - if tag2Distances == None: - self.tag2Window = [[] for _ in range(4)] - else: - if (all(map(lambda x: x >= 0.0, tag2Distances))): #all values are non-negative - if len(self.tag2Window[0]) < self.windowSize: - for i, dist in enumerate(tag2Distances): - self.tag2Window[i].append(dist) - else: - for i, dist in enumerate(tag2Distances): - self.tag2Window[i] = self.tag2Window[i][1:] + [dist] - - return [[(sum(distArray) / (len(distArray) - distArray.count(0.0)) if (len(distArray) - distArray.count(0.0)) else 0.0) for distArray in self.tag1Window], [(sum(distArray) / (len(distArray) - distArray.count(0.0)) if (len(distArray) - distArray.count(0.0)) else 0.0) for distArray in self.tag2Window] if len(self.tag2Window[0]) else None] - - def writeToDevice(self, value): - self.ser.write((value + "\n").encode()) - - def __del__(self): - print("Closing port") - self.ser.close() - -class UWBNode(Node): - def __init__(self): - super().__init__('uwb_navigation_node') - - self.publisher = self.create_publisher(Point, '/tag1_location', 10) - self.start_publisher = self.create_publisher(Point, '/start_location', 10) - self.target_location_pub = self.create_publisher(Point, '/target_location', 10) - self.api_pub = self.create_publisher(String, '/api_command', 10) - - self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10) - - self.subscription = self.create_subscription(String, '/nav_command', self.handle_nav_command, 10) - self.imu_subscription = self.create_subscription(Imu, '/imu', self.handle_imu, 10) - - self.current_yaw = 0.0 - self.uwb_to_imu_offset = 0.0 - self.mode = "Manual" - self.serial_controller = SerialController("/dev/ttyACM0") - self.anchors = {} - self.anchorHeight = 250 - self.anchors_coords_known = False - self.previous_tag1_position = None - - self.windowSize = 10 - self.tag1Window = [] - self.tag2Window = [] - - self.current_tag1_position = None - self.current_tag2_position = None - - # Graph data for call mode navigation - self.graph_data = None - - # For Follow mode: - self.follow_offset_computed = False # True once we have computed initial heading offset - self.follow_target_reached = False - - # For Call mode: - self.path = [] - self.current_target_index = 0 # Index in the path - self.call_mode_active = False - workspace_root = os.getcwd() # Current working directory - anchors_file_path = os.path.abspath(os.path.join(workspace_root, "src", "robobin", "robobin", "graphs", "anchors.json")) - if os.path.exists(anchors_file_path): - try: - with open(anchors_file_path, 'r') as f: - anchors_data = json.load(f) - self.anchors = anchors_data - self.anchors_coords_known = True - self.get_logger().info("Anchor coordinates loaded from anchors.json.") - except Exception as e: - self.get_logger().error(f"Failed to load anchors.json: {e}") - self.anchors_coords_known = False - else: - self.get_logger().warning("anchors.json not found, anchor coordinates not known.") - self.anchors_coords_known = False - - # Timer to continuously fetch and publish positions - self.timer = self.create_timer(1.0, self.update_positions) - - def call_bpm(self): - try: - # Retrieve beacon distances and determine anchor coordinates - beacon_distances = self.serial_controller.getBeaconPositioningDistances() - self.get_logger().info(f"Retrieved values {beacon_distances}") - self.determine_anchor_coords(beacon_distances) - self.anchors_coords_known = True - self.get_logger().info("Anchor coordinates determined.") - - # Define workspace path once - workspace_root = os.getcwd() - anchors_file_path = os.path.abspath(os.path.join(workspace_root, "src", "robobin", "robobin", "graphs", "anchors.json")) - graph_file_path = os.path.abspath(os.path.join(workspace_root, "src", "robobin", "robobin", "graphs", "graph.json")) - - try: - # Save anchors to anchors.json - with open(anchors_file_path, 'w') as f: - json.dump(self.anchors, f) - self.get_logger().info("Anchor coordinates saved to anchors.json.") - - # Construct graph structure - graph = { - "name": "Lab 1 Extended", - "nodes": [], - "connections": [] - } - - # Populate nodes with anchor data - anchor_names = ["A1", "A2", "A3", "A4"] - for i, (key, coords) in enumerate(self.anchors.items()): - graph["nodes"].append({"name": anchor_names[i], "x": coords[0], "y": coords[1]}) - - # Create a fully connected adjacency matrix (all nodes connected) - num_nodes = len(graph["nodes"]) - graph["connections"] = [[True if i != j else False for j in range(num_nodes)] for i in range(num_nodes)] - - # Save graph to graph.json - with open(graph_file_path, 'w') as f: - json.dump(graph, f, indent=2) - self.get_logger().info("Graph saved to graph.json.") - - except Exception as e: - self.get_logger().error(f"Failed to save graph or anchors: {e}") - - except Exception as e: - self.get_logger().error(f"Failed to determine anchors: {e}") - - def handle_imu(self, msg): - # Extract the quaternion from the IMU message - qx = msg.orientation.x - qy = msg.orientation.y - qz = msg.orientation.z - qw = msg.orientation.w - - # Convert quaternion to Euler angles (roll, pitch, yaw) - # euler_from_quaternion expects [x, y, z, w] - (roll, pitch, yaw) = euler_from_quaternion([qx, qy, qz, qw]) - - # Store yaw (in radians) for usage in your node - self.current_yaw = yaw - - # Optional: Print or log if you want to see the values - self.get_logger().info(f"IMU yaw (rad): {yaw:.3f}") - - - def determine_anchor_coords(self, measured_distances): - y_A = measured_distances[2] # Distance from A to D - - # Guess for B based on distance A-B and B-D - x_B = measured_distances[0] / 2 - y_B = measured_distances[4] / 2 + 200 # Start y_B above y_C - - # Guess for C with symmetrical logic - x_C = -measured_distances[5] / 2 # Allow for negative x_C - y_C = -measured_distances[1] / 2 # Allow for negative y_C - - initial_guess = [x_B, y_B, x_C, y_C, y_A] - - min_dist = min(measured_distances) - max_dist = max(measured_distances) - lower_bounds = [-max_dist, -max_dist, -max_dist, -max_dist, min_dist / 2] - upper_bounds = [max_dist * 1.5 for i in range(5)] - - def error_function(variables, measured): - x_B, y_B, x_C, y_C, y_A = variables - - # Map measured distances to a, e, d, b, f, c - a_measured, e_measured, d_measured, b_measured, f_measured, c_measured = measured - - # Compute each distance - a_calc = np.sqrt((x_B - 0)**2 + (y_B - y_A)**2) # A-B - b_calc = np.sqrt((x_C - x_B)**2 + (y_C - y_B)**2) # B-C - c_calc = np.sqrt(x_C**2 + y_C**2) # C-D - d_calc = y_A # A-D - e_calc = np.sqrt(x_C**2 + (y_C - y_A)**2) # A-C - f_calc = np.sqrt(x_B**2 + y_B**2) # B-D - - # Residuals - r_a = a_calc - a_measured - r_b = b_calc - b_measured - r_c = c_calc - c_measured - r_d = d_calc - d_measured - r_e = e_calc - e_measured - r_f = f_calc - f_measured - - # Add a smoother penalty if y_B <= y_C - penalty = 1e3 * max(0, y_C - y_B + 10) # Soft penalty to enforce constraint - - return [r_a, r_b, r_c, r_d, r_e, r_f, penalty] - - result = least_squares(error_function, initial_guess, args=(measured_distances,), bounds=(lower_bounds, upper_bounds), loss='soft_l1') - - x_B, y_B, x_C, y_C, y_A = result.x - self.anchors = { - "A1": (0, y_A, self.anchorHeight), - "A2": (x_B, y_B, self.anchorHeight), - "A3": (x_C, y_C, self.anchorHeight), - "A4": (0, 0, self.anchorHeight) - } - - def calculate_tag_coordinates(self, tag_distances): - if not self.anchors_coords_known or len(self.anchors) < 3: - self.get_logger().warning("Anchor coordinates not known.") - return None - - available_beacons = [] - available_distances = [] - - for key, dist in tag_distances.items(): - if dist > 0: - available_distances.append(dist) - available_beacons.append(self.anchors[key]) - - if len(available_beacons) < 3: - return None - - heights = [] - for (bx, by, bz), d_measured in zip(available_beacons, available_distances): - horizontal_distance = np.sqrt((bx - 0)**2 + (by - 0)**2) # Assume tag starts at (0, 0) - estimated_z = np.sqrt(max(0, d_measured**2 - horizontal_distance**2)) # Ensure non-negative - heights.append(bz - estimated_z) # Estimate tag height relative to beacon - - initial_z = max(0, min(self.anchorHeight, np.mean(heights))) # Constrain height to [0, 200] - - beacon_xs = [b[0] for b in available_beacons] - beacon_ys = [b[1] for b in available_beacons] - initial_guess = [np.mean(beacon_xs), np.mean(beacon_ys), initial_z] - - # Define bounds for the tag position - x_min = min(beacon_xs) - 1000 # Add a small margin around the beacons - x_max = max(beacon_xs) + 1000 - y_min = min(beacon_ys) - 1000 - y_max = max(beacon_ys) + 1000 - z_min = 0.0 # Tag's height cannot be below the ground - z_max = self.anchorHeight # Tag's height cannot exceed the beacon height - bounds = ([x_min, y_min, z_min], [x_max, y_max, z_max]) - - def error_function(vars): - x, y, z = vars - residuals = [] - for (bx, by, bz), d_measured in zip(available_beacons, available_distances): - d_computed = np.sqrt((x - bx)**2 + (y - by)**2 + (z - bz)**2) - residuals.append(d_computed - d_measured) - return residuals - - result = least_squares(error_function, initial_guess, bounds=bounds, loss='soft_l1') - # self.get_logger().info(f"Optimization result: {result.x}") - return tuple(result.x) - - def update_positions(self): - self.get_logger().info("Updating positions...") - if self.anchors_coords_known: - try: - ranging_distances = self.serial_controller.getRangingDistances() - # self.get_logger().info(f"Ranging distances: {ranging_distances}") - self.get_logger().warning("Ranging distances: " + str(ranging_distances)) - tag1_distances = ranging_distances[0] - tag_distances_dict = { - "A1": tag1_distances[0], - "A2": tag1_distances[1], - "A3": tag1_distances[2], - "A4": tag1_distances[3] - } - - tag1_position = self.calculate_tag_coordinates(tag_distances_dict) - - if tag1_position: - if len(self.tag1Window < self.windowSize): - self.tag1Window.append(tag1_position) - else: - self.tag1Window = self.tag1Window[1:] + [tag1_position] - - self.current_tag1_position = tuple(map(lambda li: sum(li) / len(li), zip(self.tag1Window))) - - self.get_logger().info(f"Tag 1 position: {tag1_position}") - position_msg = Point(x=self.current_tag1_position[0], y=self.current_tag1_position[1], z=self.current_tag1_position[2]) - self.publisher.publish(position_msg) - - if self.mode == "Follow": - self.get_logger().info("Follow mode active.") - tag2_distances = ranging_distances[1] - if tag2_distances: - tag2_distances_dict = { - "A1": tag2_distances[0], - "A2": tag2_distances[1], - "A3": tag2_distances[2], - "A4": tag2_distances[3] - } - tag2_position = self.calculate_tag_coordinates(tag2_distances_dict) - if tag2_position: - if len(self.tag2Window < self.windowSize): - self.tag2Window.append(tag2_position) - else: - self.tag2Window = self.tag2Window[1:] + [tag2_position] - - self.current_tag2_position = tuple(map(lambda li: sum(li) / len(li), zip(self.tag2Window))) - - self.get_logger().info(f"Tag 1 position: {self.current_tag1_position}") - self.get_logger().info(f"Tag 2 position: {self.current_tag2_position}") - target_msg = Point(x=self.current_tag2_position[0], y=self.current_tag2_position[1], z=self.current_tag2_position[2]) - self.target_location_pub.publish(target_msg) - - # Compute offset if not yet computed - if not self.follow_offset_computed and self.current_yaw is not None and self.current_tag1_position is not None: - self.uwb_to_imu_offset = self.compute_heading_offset(self.current_tag1_position, self.current_tag2_position, self.current_yaw) - self.follow_offset_computed = True - else: - self.tag2Window = [] - else: - self.get_logger().warning("Tag 2 not known") - self.tag2Window = [] - - if self.mode == "Call" and self.call_mode_active: - self.get_logger().info("Call mode active.") - if self.current_tag1_position is not None and self.current_target_index < len(self.path): - self.get_logger().info(f"Current target index: {self.current_target_index}") - target_node_name = self.path[self.current_target_index] - target_pos = self.get_node_position(target_node_name) - self.get_logger().info(f"Target position: {target_pos}") - # Publish current position - position_msg = Point(x=self.current_tag1_position[0], y=self.current_tag1_position[1], z=0.0) - self.start_publisher.publish(position_msg) - self.get_logger().info(f"Current position: {self.current_tag1_position} Success!") - - # Publish target position - target_msg = Point(x=target_pos[0], y=target_pos[1], z=0.0) - self.target_location_pub.publish(target_msg) - self.get_logger().info(f"Target position: {target_pos} Success!") - # Check if close to the node - dist = math.sqrt((self.current_tag1_position[0] - target_pos[0])**2 + - (self.current_tag1_position[1] - target_pos[1])**2) - if dist < 10: # Threshold of 5 cm - self.current_target_index += 1 - if self.current_target_index >= len(self.path): - self.get_logger().info("Call mode: Reached final destination.") - self.api_pub.publish(String(data="CALLEND")) - else: - self.get_logger().info(f"Moving to next node: {self.path[self.current_target_index]}") - - except Exception as e: - self.get_logger().error(f"Error updating positions: {e}") -#ros2\src\robobin\robobin\graphs\graph.json - def clear_graph(self): - self.anchors_coords_known = False - self.anchors = {} - self.previous_tag1_position = None - workspace_root = os.getcwd() # Current working directory - graph_file_path = os.path.abspath(os.path.join(workspace_root, "src", "robobin", "robobin", "graphs", "graph.json")) - default_file_path = os.path.abspath(os.path.join(workspace_root, "src","robobin", "robobin", "graphs", "default_graph.json")) - try: - with open(default_file_path, 'r') as f: - graph_data = json.load(f) - with open(graph_file_path, 'w') as f: - json.dump(graph_data, f) - except FileNotFoundError: - self.get_logger().error(f"Error: File not found at {default_file_path}") - except json.JSONDecodeError as e: - self.get_logger().error(f"Error: Failed to decode JSON - {str(e)}") - except Exception as e: - self.get_logger().error(f"Error: {str(e)}") - - self.get_logger().info("Graph cleared.") - - def handle_nav_command(self, msg): - self.get_logger().info(f"Raw nav command: {msg.data}") - data = msg.data.strip() - parts = data.split(" ") - self.get_logger().info(f"Received nav command: {data}") - self.get_logger().info(f"Parts: {parts}, length of first part: {len(parts[0])}") - if data == "BPM": - self.clear_graph() - self.call_bpm() - self.anchors_coords_known = True - elif data.startswith("SETMODE"): - mode = parts[1] - stop_cmd = Twist() - self.cmd_pub.publish(stop_cmd) # Publish zero velocity - self.get_logger().info("Motors stopped.") - - if mode in ["Manual", "Follow", "Call","Calibration"]: - self.mode = mode - self.get_logger().info(f"Mode set to: {mode}") - - - if mode == "Manual": - self.call_mode_active = False - self.follow_offset_computed = False - self.follow_target_reached = False - self.get_logger().info("Publishing to pathing node stopped.") - - - stop_cmd = Twist() - self.cmd_pub.publish(stop_cmd) - self.get_logger().info("Motors stopped.") - - elif mode == "Follow": - self.follow_offset_computed = False - self.follow_target_reached = False - self.get_logger().info("Follow mode initialized.") - - elif mode == "Call": - self.call_mode_active = False - self.get_logger().info("Call mode initialized but not active yet.") - elif mode == "Calibration": - self.get_logger().info("Calibration mode initialized.") - self.start_calibration() - else: - self.get_logger().error("Invalid mode for UWB Navigation") - - elif self.mode == "Call" and parts and parts[0] == "CALL": - self.get_logger().info(f"Received CALL command: {data}") - # Example: CALL A1 Node6 - parts = data.split(" ") - if len(parts) < 3: - self.get_logger().error("CALL command requires start and end nodes.") - return - start_location = parts[1] - end_location = parts[2] - self.get_logger().info(f"Call received: Start={start_location}, End={end_location}") - # Load graph - workspace_root = os.getcwd() - graph_file_path = os.path.abspath(os.path.join(workspace_root, "src", "robobin", "robobin", "graphs", "graph.json")) - try: - with open(graph_file_path, 'r') as f: - self.graph_data = json.load(f) - self.get_logger().info("Graph loaded.") - except FileNotFoundError: - self.get_logger().error(f"Error: File not found at {graph_file_path}") - return - except json.JSONDecodeError as e: - self.get_logger().error(f"Error: Failed to decode JSON - {str(e)}") - return - except Exception as e: - self.get_logger().error(f"Error: {str(e)}") - return - - - self.path = self.a_star_search(self.graph_data, start_location, end_location) - if self.path: - self.get_logger().info(f"Path found: {self.path}") - self.current_target_index = 0 - self.call_mode_active = True - else: - self.get_logger().error("No path found.") - - def compute_heading_offset(self, robot_pos, target_pos, imu_yaw): - # Compute heading from UWB positions - dx = target_pos[0] - robot_pos[0] - dy = target_pos[1] - robot_pos[1] - uwb_heading = math.atan2(dy, dx) - offset = imu_yaw - uwb_heading - return offset - - def get_node_position(self, node_name): - if self.graph_data is None: - return None - for node in self.graph_data["nodes"]: - if node["name"] == node_name: - return (node["x"], node["y"]) - return None - - def a_star_search(self, graph_data, start_name, goal_name): - # Parse nodes and connections - nodes = graph_data["nodes"] - connections = graph_data["connections"] - - # Create a mapping from name to index - name_to_index = {node["name"]: i for i, node in enumerate(nodes)} - if start_name not in name_to_index or goal_name not in name_to_index: - self.get_logger().error("Start or Goal node not found in the graph.") - return None - - start_index = name_to_index[start_name] - goal_index = name_to_index[goal_name] - - - def heuristic(n1, n2): - x1, y1 = nodes[n1]["x"], nodes[n1]["y"] - x2, y2 = nodes[n2]["x"], nodes[n2]["y"] - return math.sqrt((x1 - x2)**2 + (y1 - y2)**2) - - open_set = {start_index} - came_from = {} - g_score = {i: float('inf') for i in range(len(nodes))} - g_score[start_index] = 0 - f_score = {i: float('inf') for i in range(len(nodes))} - f_score[start_index] = heuristic(start_index, goal_index) - - while open_set: - current = min(open_set, key=lambda idx: f_score[idx]) - if current == goal_index: - - return self.reconstruct_path(came_from, current, nodes) - - open_set.remove(current) - - for neighbor, connected in enumerate(connections[current]): - if not connected: - continue - tentative_g_score = g_score[current] + heuristic(current, neighbor) - if tentative_g_score < g_score[neighbor]: - came_from[neighbor] = current - g_score[neighbor] = tentative_g_score - f_score[neighbor] = tentative_g_score + heuristic(neighbor, goal_index) - if neighbor not in open_set: - open_set.add(neighbor) - - return None - - def reconstruct_path(self, came_from, current, nodes): - total_path = [nodes[current]["name"]] - while current in came_from: - current = came_from[current] - total_path.insert(0, nodes[current]["name"]) - return total_path - - def start_calibration(self): - """ - When doing calibration, the robobin will drive forward for 2 seconds at 0.25 m/s (WARNING). - """ - if not self.current_tag1_position: - self.get_logger().error("No current UWB position available - cannot calibrate.") - return - - self.calibration_active = True - - - self.calib_start_position = self.current_tag1_position - self.calib_start_yaw = self.current_yaw - - forward_cmd = Twist() - forward_cmd.linear.x = 0.25 - self.cmd_pub.publish(forward_cmd) - self.get_logger().info("Driving forward at 0.25 m/s for 2 seconds...") - - - self.calibration_timer = self.create_timer(2.0, self.finish_calibration) - - def finish_calibration(self): - - - if self.calibration_timer: - self.destroy_timer(self.calibration_timer) - self.calibration_timer = None - - stop_cmd = Twist() - self.cmd_pub.publish(stop_cmd) - - if not self.current_tag1_position: - self.get_logger().error("No final UWB position available - calibration failed.") - self.calibration_active = False - return - - calib_end_position = self.current_tag1_position - calib_end_yaw = self.current_yaw - - - dx = calib_end_position[0] - self.calib_start_position[0] - dy = calib_end_position[1] - self.calib_start_position[1] - uwb_heading = math.atan2(dy, dx) - - offset = calib_end_yaw - uwb_heading - self.uwb_to_imu_offset = offset - - - self.get_logger().info("CALIBRATION COMPLETE!") - self.get_logger().info(f" Start Pos (UWB): {self.calib_start_position}") - self.get_logger().info(f" End Pos (UWB): {calib_end_position}") - self.get_logger().info(f" UWB heading: {uwb_heading:.3f} rad") - self.get_logger().info(f" IMU yaw: {calib_end_yaw:.3f} rad") - self.get_logger().info(f" --> OFFSET: {offset:.3f} rad") - - - self.calibration_active = False - self.mode = "Manual" - -def main(args=None): - rclpy.init(args=args) - node = UWBNode() - try: - rclpy.spin(node) - except KeyboardInterrupt: - pass - finally: - node.destroy_node() - rclpy.shutdown() - -if __name__ == "__main__": - main() diff --git a/ros2/src/robobin/robobin/uwb_pathing_node.py b/ros2/src/robobin/robobin/uwb_pathing_node.py deleted file mode 100644 index 00fc1b498f4e752d4c447257a718e56f52858d9c..0000000000000000000000000000000000000000 --- a/ros2/src/robobin/robobin/uwb_pathing_node.py +++ /dev/null @@ -1,122 +0,0 @@ -import rclpy -from rclpy.node import Node -from geometry_msgs.msg import Twist, Point -from math import sqrt, pow, atan2, radians, sin, cos -import time - - -class UWBPathingNode(Node): - def __init__(self): - super().__init__('uwb_pathing_node') - - # Publisher for cmd_vel - self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10) - - # Subscriptions - self.create_subscription(Point, '/start_location', self.current_location_callback, 10) - self.create_subscription(Point, '/target_location', self.target_location_callback, 10) - - # Current and target positions - self.current_x = None - self.current_y = None - self.target_x = None - self.target_y = None - - # Navigation thresholds - self.distance_threshold = 0.25 # meters - self.angle_threshold = radians(0.5) # radians - - self.kp_linear = 0.5 # Proportional gain for linear movement - self.kp_angular = 1.0 # Proportional gain for angular movement - - self.get_logger().info('UWB Pathing Node started and waiting for positions.') - - def current_location_callback(self, msg: Point): - """Callback to update the robot's current position.""" - self.current_x = msg.x - self.current_y = msg.y - self.get_logger().info(f"Current Location Updated: x={self.current_x:.2f}, y={self.current_y:.2f}") - self.check_and_navigate() - - def target_location_callback(self, msg: Point): - """Callback to update the target position.""" - self.target_x = msg.x - self.target_y = msg.y - self.get_logger().info(f"Target Location Updated: x={self.target_x:.2f}, y={self.target_y:.2f}") - self.check_and_navigate() - - def check_and_navigate(self): - """Check if both positions are available and navigate to the target.""" - if self.current_x is not None and self.current_y is not None and self.target_x is not None and self.target_y is not None: - self.get_logger().info("Navigating to target...") - self.navigate_to_target() - else: - self.get_logger().warning("Waiting for both current and target positions to be available...") - - def navigate_to_target(self): - # Ensure positions are known - if self.current_x is None or self.current_y is None or self.target_x is None or self.target_y is None: - self.get_logger().warning("Positions not fully known yet.") - return - - # Calculate distance and angle to the target - displacement_x = self.target_x - self.current_x - displacement_y = self.target_y - self.current_y - distance_to_target = sqrt(pow(displacement_x, 2) + pow(displacement_y, 2)) - angle_to_target = atan2(displacement_y, displacement_x) - - - # Check if we are close enough to the target - if distance_to_target <= self.distance_threshold: - self.stop_robot() - self.get_logger().info("Target reached successfully.") - return - - # Calculate yaw error (assuming robot orientation 0 = facing x-axis) - - yaw_error = angle_to_target - yaw_error = atan2(sin(yaw_error), cos(yaw_error)) # Normalize angle - self.get_logger().info(f"Current Position: x={self.current_x:.2f}, y={self.current_y:.2f}") - self.get_logger().info(f"Target Position: x={self.target_x:.2f}, y={self.target_y:.2f}") - self.get_logger().info(f"Distance to Target: distance_to_target={distance_to_target:.2f} meters") - self.get_logger().info(f"Angle to Target: angle_to_target={angle_to_target:.2f} radians") - twist = Twist() - - # Decide on angular velocity first - if abs(yaw_error) > self.angle_threshold: - angular_speed = self.kp_angular * abs(yaw_error) - twist.angular.z = angular_speed if yaw_error > 0 else -angular_speed - self.get_logger().info(f"Correcting Heading: yaw_error={yaw_error:.2f} radians") - else: - # Move forward when aligned with the target - linear_speed = self.kp_linear * distance_to_target - twist.linear.x = min(0.2, linear_speed) # Limit max speed - self.get_logger().info(f"Moving to Target: distance={distance_to_target:.2f} meters") - - # # Publish movement command - # self.cmd_vel_pub.publish(twist) - - - def stop_robot(self): - """Stops the robot by publishing zero velocities.""" - twist = Twist() - self.cmd_vel_pub.publish(twist) - time.sleep(0.5) - self.get_logger().info("Robot stopped.") - - -def main(args=None): - rclpy.init(args=args) - node = UWBPathingNode() - - try: - rclpy.spin(node) - except KeyboardInterrupt: - pass - finally: - node.destroy_node() - rclpy.shutdown() - - -if __name__ == '__main__': - main() diff --git a/ros2/src/robobin/setup.py b/ros2/src/robobin/setup.py index b32395395586e8c9b6388eb6d76f649819af4727..489980a1de2829a169ae52292f59fee539b65ffa 100644 --- a/ros2/src/robobin/setup.py +++ b/ros2/src/robobin/setup.py @@ -26,12 +26,10 @@ setup( 'console_scripts': [ 'api_node = robobin.api_node:main', 'motor_control_node = robobin.motor_control_node:main', - 'uwb_navigation_node = robobin.uwb_navigation_node:main', 'imu_node = robobin.imu_node:main', 'motor_node = robobin.motor_control_node:main', 'encoder_node = robobin.encoder:main', 'control_feedback = robobin.control_feedback:main', - 'uwb_pathing_node = robobin.uwb_pathing_node:main', ], }, )