From 04ec9daa8b373f62315308b7ea01c3b593e9099f Mon Sep 17 00:00:00 2001 From: Paul-Winpenny <92634321+Paul-Winpenny@users.noreply.github.com> Date: Tue, 10 Dec 2024 18:52:33 +0000 Subject: [PATCH] Try this with serial gui. --- .../realtime_location_with_serial_gui.py | 305 ++++++++++++++++++ 1 file changed, 305 insertions(+) create mode 100644 Wireless_Communication/UWB/Beacons_tag_position/realtime_location_with_serial_gui.py diff --git a/Wireless_Communication/UWB/Beacons_tag_position/realtime_location_with_serial_gui.py b/Wireless_Communication/UWB/Beacons_tag_position/realtime_location_with_serial_gui.py new file mode 100644 index 00000000..01eae227 --- /dev/null +++ b/Wireless_Communication/UWB/Beacons_tag_position/realtime_location_with_serial_gui.py @@ -0,0 +1,305 @@ +import tkinter as tk +from tkinter import ttk +from scipy.optimize import least_squares +import numpy as np +import serial, time +import threading +class SerialBuffer: + def __init__(self, port): + self.ser = serial.Serial(port, 115200) + + def readFromDevice(self, expectedLines = 1): + lines = [] + while self.ser.in_waiting or len(lines) < expectedLines: + lines.append(self.ser.readline().decode()) + return list(map(lambda line: line.strip(), lines)) + + def getBeaconPositioningDistances(self): + self.writeToDevice("bpm") + buffer = self.readFromDevice(1)[0] + values = list(map(float, buffer.split(" "))) + return values + + def getRangingDistances(self): + self.writeToDevice("rng") + lines = self.readFromDevice(2) + distances = [] + distances.append(list(map(float, lines[0][1:].split(" ")))) + if lines[1] != "0": + distances.append(list(map(float, lines[1][1:].split(" ")))) + else: + distances.append(None) + return distances + + def writeToDevice(self, value): + self.ser.write((value + "\n").encode()) + + def __del__(self): + print("Closing port") + self.ser.close() + + +class AnchorTagGUI: + def __init__(self, root): + self.root = root + self.root.title("Anchor and Tag Visualization") + self.root.resizable(False, False) + self.serial_buffer = SerialBuffer("COM5") # Initialize SerialBuffer + self.running = True # To stop the thread safely + self.start_background_thread() + + self.root.configure(bg='navy blue') + self.left_frame = tk.Frame(root) + self.left_frame.pack(side=tk.LEFT, fill=tk.Y, padx=10, pady=10) + + self.right_frame = tk.Frame(root) + self.right_frame.pack(side=tk.RIGHT, fill=tk.Y, padx=10, pady=10) + + self.canvas = tk.Canvas(root, width=600, height=600, bg="lightgray") + self.canvas.pack(side=tk.LEFT, padx=10, pady=10) + + + ttk.Label(self.right_frame, text="Enter distances from tag to anchors:").pack() + self.tag_distances = {} # Distances from the tag to specific anchors + for anchor in ["A1", "A2", "A3", "A4"]: + ttk.Label(self.right_frame, text=f"Distance to {anchor}:").pack() + self.tag_distances[anchor] = tk.StringVar() + ttk.Entry(self.right_frame, textvariable=self.tag_distances[anchor]).pack() + + + self.calc_button = ttk.Button(self.right_frame, text="Calculate Tag Position", command=self.calculate_tag_position) + self.calc_button.pack() + + + self.output_label = ttk.Label(self.right_frame, text="") + self.output_label.pack() + + self.anchors = {} + self.measured_distances = [tk.DoubleVar(value=200.22), tk.DoubleVar(value=200.47), tk.DoubleVar(value=170.00), tk.DoubleVar(value=170.71), tk.DoubleVar(value=150.00), tk.DoubleVar(value=160.08)] # A, E, D, B, F, C + for i, anchor in enumerate(["a", "e", "d", "b", "f", "c"]): + ttk.Label(self.right_frame, text=f"Distance {anchor}:").pack() + ttk.Entry(self.right_frame, textvariable=self.measured_distances[i]).pack() + + + + self.generate_anchors_button = ttk.Button( + self.right_frame, text="Generate Anchor Coordinates", command=self.determine_anchor_coords + ) + self.generate_anchors_button.pack() + + def determine_anchor_coords(self): + try: + measured_distances = self.measured_distances + + # Measured distances arrive in order: A, E, D, B, F, C + + measured_distances = [var.get() for var in measured_distances] + + measured_distances = np.array(measured_distances) + # Introduce ±10 cm of noise + noise_level = 10.0 + measured_distances_noisy = measured_distances + np.random.uniform(-noise_level, noise_level, size=len(measured_distances)) + + # Variables: x_B, y_B, x_C, y_C, y_A + # Initial guess (adjust as needed) + initial_guess = [-200, -900, 400, 900, 800] # [x_B, y_B, x_C, y_C, y_A] + + # Bounds for all variables from -10000 to 10000 + bounds = ([-10000, -10000, -10000, -10000, -10000], + [10000, 10000, 10000, 10000, 10000]) + + def error_function(variables, measured): + x_B, y_B, x_C, y_C, y_A = variables + + # Map measured distances to a,b,c,d,e,f + # measured: [A, E, D, B, F, C] + a_measured = measured[0] + e_measured = measured[1] + d_measured = measured[2] + b_measured = measured[3] + f_measured = measured[4] + c_measured = measured[5] + + # Compute each distance + # A=(0,y_A), B=(x_B,y_B), C=(x_C,y_C), D=(0,0) + a_calc = np.sqrt((x_B - 0)**2 + (y_B - y_A)**2) # A-B + b_calc = np.sqrt((x_C - x_B)**2 + (y_C - y_B)**2) # B-C + c_calc = np.sqrt(x_C**2 + y_C**2) # C-D + d_calc = y_A # A-D + e_calc = np.sqrt(x_C**2 + (y_C - y_A)**2) # A-C + f_calc = np.sqrt(x_B**2 + y_B**2) # B-D + + # Residuals + r_a = a_calc - a_measured + r_b = b_calc - b_measured + r_c = c_calc - c_measured + r_d = d_calc - d_measured + r_e = e_calc - e_measured + r_f = f_calc - f_measured + + return [r_a, r_b, r_c, r_d, r_e, r_f] + + # Run least squares optimization + result_noisy = least_squares( + error_function, + initial_guess, + args=(measured_distances_noisy,), + bounds=bounds, + loss='soft_l1' + ) + optimized_coords_noisy = result_noisy.x + + # Update anchors + self.anchors = { + "A1": (0, optimized_coords_noisy[4]), # A is fixed at origin + "A2": (optimized_coords_noisy[0], optimized_coords_noisy[1]), + "A3": (optimized_coords_noisy[2], optimized_coords_noisy[3]), + "A4": (0, 0), # D is free + } + + # Display anchors on canvas + self.draw_canvas() + + # self.output_label.config( + # text=f"Anchors Generated Successfully! Coordinates: { {k: (round(v[0], 2), round(v[1], 2)) for k, v in self.anchors.items()} }" + # ) + + except Exception as e: + self.output_label.config(text=f"Error: {str(e)}") + + def calculate_tag_position(self): + try: + # Parse distances from the tag to each anchor + distances = { + anchor: float(self.tag_distances[anchor].get()) + for anchor in self.anchors + } + + # Trilateration using nonlinear optimization + def error_function(variables): + x, y = variables + residuals = [] + for anchor, (xa, ya) in self.anchors.items(): + d_measured = distances[anchor] + d_calculated = np.sqrt((x - xa) ** 2 + (y - ya) ** 2) + residuals.append(d_calculated - d_measured) + return residuals + + # Initial guess for tag position + initial_guess = [300, 300] # Center of the canvas as the initial guess + result = least_squares(error_function, initial_guess) + + # Extract optimized tag position + x_tag, y_tag = result.x + + # Update the canvas + self.draw_canvas(x_tag, y_tag) + + # Display result + self.output_label.config(text=f"Tag Position: ({x_tag:.2f}, {y_tag:.2f})") + #print(f"Tag Position: ({x_tag:.2f}, {y_tag:.2f})") + except Exception as e: + self.output_label.config(text=f"Error: {str(e)}") + + def draw_canvas(self, x_tag=None, y_tag=None): + """Draw anchors and tag on the canvas.""" + self.canvas.delete("all") + canvas_width, canvas_height = 600, 600 + center_x, center_y = canvas_width // 2, canvas_height // 2 + scale_factor = 1 # Adjust to fit the canvas + + # Calculate offset to center anchors + # Determine the average position of the anchors + if self.anchors: + avg_x = sum(x for x, y in self.anchors.values()) / len(self.anchors) + avg_y = sum(y for x, y in self.anchors.values()) / len(self.anchors) + else: + avg_x, avg_y = 0, 0 + + # Offset to center anchors on the canvas + x_offset = center_x - avg_x + y_offset = center_y - avg_y + + # Adjust all points to fit within canvas and move the origin + def transform_coordinates(x, y): + x_scaled = int((x + x_offset) * scale_factor) + y_scaled = int(canvas_height - (y + y_offset) * scale_factor) # Flip Y-axis for top-left origin + return x_scaled, y_scaled + + # Draw anchors + for name, (x, y) in self.anchors.items(): + x_scaled, y_scaled = transform_coordinates(x, y) + rounded_x, rounded_y = round(x, 2), round(y, 2) + self.canvas.create_oval( + x_scaled - 5, y_scaled - 5, x_scaled + 5, y_scaled + 5, fill="blue" + ) + label = f"{name} ({rounded_x}, {rounded_y})" + self.canvas.create_text(x_scaled, y_scaled + 15, text=label) + + + # Draw the tag position + if x_tag is not None and y_tag is not None: + x_tag_scaled, y_tag_scaled = transform_coordinates(x_tag, y_tag) + self.canvas.create_oval( + x_tag_scaled - 5, y_tag_scaled - 5, x_tag_scaled + 5, y_tag_scaled + 5, fill="red" + ) + label = f"Tag ({round(x_tag, 2)}, {round(y_tag, 2)})" + self.canvas.create_text( + x_tag_scaled + 15, y_tag_scaled+15, text=label, fill="black" + ) + + + + def start_background_thread(self): + def fetch_data(): + while self.running: + try: + # Fetch beacon positioning distances + beacon_distances = self.serial_buffer.getBeaconPositioningDistances() + # Fetch ranging distances + ranging_distances = self.serial_buffer.getRangingDistances() + + # Update GUI distances on the main thread + self.root.after(0, self.update_gui_distances, beacon_distances, ranging_distances) + except Exception as e: + print(f"Error fetching distances: {e}") + time.sleep(1) # Adjust the frequency of fetching as needed + + self.thread = threading.Thread(target=fetch_data, daemon=True) + self.thread.start() + + def update_gui_distances(self, beacon_distances, ranging_distances): + try: + # Update anchor distances + for i, distance in enumerate(beacon_distances): + if i < len(self.measured_distances): + self.measured_distances[i].set(distance) + + # Update tag distances (if ranging_distances are not None) + if ranging_distances: + for i, distance in enumerate(ranging_distances[0]): # Assuming first list is valid + if i < len(self.tag_distances): + anchor = list(self.tag_distances.keys())[i] + self.tag_distances[anchor].set(distance) + except Exception as e: + print(f"Error updating GUI distances: {e}") + + def stop_thread(self): + self.running = False + self.thread.join() + + def __del__(self): + self.stop_thread() + del self.serial_buffer # Ensure the serial port is closed + + +if __name__ == "__main__": + root = tk.Tk() + app = AnchorTagGUI(root) + + def on_closing(): + app.stop_thread() + root.destroy() + + root.protocol("WM_DELETE_WINDOW", on_closing) + root.mainloop() \ No newline at end of file -- GitLab