From 963568040fdbc53afcb46ab465fbbab9d41412e5 Mon Sep 17 00:00:00 2001 From: mhby1g21 <mhby1g21@soton.ac.uk> Date: Wed, 30 Oct 2024 15:19:50 +0000 Subject: [PATCH] refactored edge_net_tab.py, lots less lines now, but still have some visual problems --- scripts/debug_tool/GUI_debug.py | 3 + scripts/debug_tool/tabs/edge_net_tab.py | 1271 +++++++---------------- scripts/debug_tool/utils/qt_widgets.py | 14 +- 3 files changed, 406 insertions(+), 882 deletions(-) diff --git a/scripts/debug_tool/GUI_debug.py b/scripts/debug_tool/GUI_debug.py index a4c39fb..1ce8690 100644 --- a/scripts/debug_tool/GUI_debug.py +++ b/scripts/debug_tool/GUI_debug.py @@ -1,4 +1,5 @@ from PyQt6.QtWidgets import QApplication, QMainWindow, QWidget, QVBoxLayout, QTabWidget +from PyQt6.QtCore import Qt import sys import os @@ -6,6 +7,7 @@ from tabs.config_tab import ConfigTab from tabs.shifter_tab import ShifterTab from tabs.depth_tab import DepthTab from tabs.material_tab import MaterialTab +from tabs.edge_net_tab import EdgeNetTab from utils.config_reader import ConfigReader class ModuleDebugGUI(QMainWindow): @@ -40,6 +42,7 @@ class ModuleDebugGUI(QMainWindow): self.tabs.addTab(ShifterTab(self.config_reader), "Image Shifter") self.tabs.addTab(DepthTab(self.config_reader), "MonoDepth Estimation") self.tabs.addTab(MaterialTab(self.config_reader), "Material Recognition") + self.tabs.addTab(EdgeNetTab(self.config_reader), "EdgeNet Execution") def main(): app = QApplication(sys.argv) diff --git a/scripts/debug_tool/tabs/edge_net_tab.py b/scripts/debug_tool/tabs/edge_net_tab.py index f6018d6..1a86c80 100644 --- a/scripts/debug_tool/tabs/edge_net_tab.py +++ b/scripts/debug_tool/tabs/edge_net_tab.py @@ -1,983 +1,494 @@ -import tkinter as tk -from tkinter import ttk, messagebox, filedialog +from PyQt6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, + QPushButton, QCheckBox, QProgressBar, QTextEdit, + QMessageBox, QFileDialog) +from PyQt6.QtCore import Qt, QThread, pyqtSignal, QTimer +from PyQt6.QtGui import QPixmap import os -import subprocess -import shutil -from PIL import Image, ImageTk import threading import queue import time +import sys -class EdgeNetTab: - def __init__(self, notebook, config_reader): - self.tab = ttk.Frame(notebook) - notebook.add(self.tab, text='EdgeNet Processing') - +from utils.file_handlers import (select_file, run_command, clean_directory, + copy_file) +from utils.image_handlers import update_preview, clear_previews +from utils.qt_widgets import (create_group_with_text, create_button_layout, + create_preview_group, create_status_label, + update_status_indicator, show_confirmation_dialog) + +class ProcessThread(QThread): + finished = pyqtSignal(bool, str) + progress = pyqtSignal(str) + + def __init__(self, func, *args, **kwargs): + super().__init__() + self.func = func + self.args = args + self.kwargs = kwargs + self.running = True + + def run(self): + try: + if self.running: + result = self.func(*self.args, **self.kwargs) + self.finished.emit(True, "") + except Exception as e: + self.finished.emit(False, str(e)) + + def stop(self): + self.running = False + self.wait() + +class EdgeNetTab(QWidget): + def __init__(self, config_reader): + super().__init__() self.config_reader = config_reader + self.setup_paths() + self.init_variables() + self.init_ui() + self.connect_signals() + self.current_thread = None + + def closeEvent(self, event): + """Handle widget close event""" + if self.current_thread and self.current_thread.isRunning(): + self.current_thread.stop() + self.current_thread.wait() + event.accept() + + def setup_paths(self): + """Initialize directory and file paths""" self.edge_net_dir = self.config_reader.directories['edgeNetDir'] - self.output_dir = self.config_reader.directories['outputDir'] - self.script_dir = self.config_reader.directories['scriptDir'] - - # Updated default input directory + self.output_dir = self.config_reader.directories['outputDir'] self.input_dir = os.path.join(self.edge_net_dir, "Data", "Input") - # Option for including top - self.include_top = tk.BooleanVar(value=False) - - # Dictionary to store manual input paths + def init_variables(self): + """Initialize class variables""" + self.include_top = False + self.is_processing = False self.manual_inputs = { 'depth_e.png': None, 'rgb.png': None, 'material.png': None } - - # Add queue for process output self.output_queue = queue.Queue() - # Add processing flag - self.is_processing = False + def init_ui(self): + """Initialize user interface""" + layout = QVBoxLayout() - # Add progress variables - self.progress_var = tk.DoubleVar(value=0) + # Split into left and right sections + hlayout = QHBoxLayout() + left_panel = self.create_left_panel() + right_panel = self.create_right_panel() - self.setup_ui() + hlayout.addWidget(left_panel) + hlayout.addWidget(right_panel) + layout.addLayout(hlayout) - # Start output monitoring - self.monitor_output() - - def setup_ui(self): - # Split into left and right frames - left_frame = ttk.Frame(self.tab) - left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5) + self.setLayout(layout) - right_frame = ttk.Frame(self.tab) - right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) + def create_left_panel(self): + """Create left control panel""" + widget = QWidget() + layout = QVBoxLayout() - self.setup_control_panel(left_frame) - self.setup_preview_panel(right_frame) - - def setup_control_panel(self, parent): - # Control section - control_frame = ttk.LabelFrame(parent, text="Controls", padding="5") - control_frame.pack(fill=tk.X, pady=5) - - # File input section - file_frame = ttk.LabelFrame(control_frame, text="Input Files", padding="5") - file_frame.pack(fill=tk.X, pady=5) - - # Enhance360 inputs - enhance_frame = ttk.LabelFrame(file_frame, text="enhance360.py inputs", padding="5") - enhance_frame.pack(fill=tk.X, pady=2) - - # depth_e.png input - depth_frame = ttk.Frame(enhance_frame) - depth_frame.pack(fill=tk.X, pady=2) - ttk.Label(depth_frame, text="depth_e.png:").pack(side=tk.LEFT, padx=5) - self.depth_label = ttk.Label(depth_frame, text="Using default") - self.depth_label.pack(side=tk.LEFT, padx=5) - ttk.Button( - depth_frame, - text="Select", - command=lambda: self.select_input_file("depth_e.png") - ).pack(side=tk.RIGHT, padx=5) - - # rgb.png input - rgb_frame = ttk.Frame(enhance_frame) - rgb_frame.pack(fill=tk.X, pady=2) - ttk.Label(rgb_frame, text="rgb.png:").pack(side=tk.LEFT, padx=5) - self.rgb_label = ttk.Label(rgb_frame, text="Using default") - self.rgb_label.pack(side=tk.LEFT, padx=5) - ttk.Button( - rgb_frame, - text="Select", - command=lambda: self.select_input_file("rgb.png") - ).pack(side=tk.RIGHT, padx=5) - - # Infer360 inputs - infer_frame = ttk.LabelFrame(file_frame, text="infer360.py inputs", padding="5") - infer_frame.pack(fill=tk.X, pady=2) - - # material.png input - material_frame = ttk.Frame(infer_frame) - material_frame.pack(fill=tk.X, pady=2) - ttk.Label(material_frame, text="material.png:").pack(side=tk.LEFT, padx=5) - self.material_label = ttk.Label(material_frame, text="Using default") - self.material_label.pack(side=tk.LEFT, padx=5) - ttk.Button( - material_frame, - text="Select", - command=lambda: self.select_input_file("material.png") - ).pack(side=tk.RIGHT, padx=5) - - # Reset inputs button - ttk.Button( - file_frame, - text="Reset to Default Inputs", - command=self.reset_inputs - ).pack(pady=5) - - # Include top checkbox - ttk.Checkbutton( - control_frame, - text="Include Top in Mesh", - variable=self.include_top - ).pack(anchor=tk.W, padx=5, pady=5) - - # Progress indicators - self.progress_frame = ttk.LabelFrame(control_frame, text="Progress", padding="5") - self.progress_frame.pack(fill=tk.X, pady=5) - - # EdgeNet progress - edge_frame = ttk.Frame(self.progress_frame) - edge_frame.pack(fill=tk.X, pady=2) - ttk.Label(edge_frame, text="EdgeNet:").pack(side=tk.LEFT, padx=5) - self.edge_status = ttk.Label(edge_frame, text="Not started") - self.edge_status.pack(side=tk.LEFT, padx=5) - - # Mesh splitting progress - split_frame = ttk.Frame(self.progress_frame) - split_frame.pack(fill=tk.X, pady=2) - ttk.Label(split_frame, text="Mesh Splitting:").pack(side=tk.LEFT, padx=5) - self.split_status = ttk.Label(split_frame, text="Not started") - self.split_status.pack(side=tk.LEFT, padx=5) - - # Blender flip progress - flip_frame = ttk.Frame(self.progress_frame) - flip_frame.pack(fill=tk.X, pady=2) - ttk.Label(flip_frame, text="Blender Flip:").pack(side=tk.LEFT, padx=5) - self.flip_status = ttk.Label(flip_frame, text="Not started") - self.flip_status.pack(side=tk.LEFT, padx=5) + # Input files section + files_group = self.create_input_files_group() + layout.addWidget(files_group) - # Control buttons - button_frame = ttk.Frame(control_frame) - button_frame.pack(fill=tk.X, pady=5) - - ttk.Button( - button_frame, - text="Run EdgeNet", - command=self.run_edge_net - ).pack(side=tk.LEFT, padx=5) - - ttk.Button( - button_frame, - text="Run Mesh Split", - command=self.run_mesh_split - ).pack(side=tk.LEFT, padx=5) - - ttk.Button( - button_frame, - text="Run Blender Flip", - command=self.run_blender_flip - ).pack(side=tk.LEFT, padx=5) - - ttk.Button( - button_frame, - text="Run All Steps", - command=self.run_all_steps - ).pack(side=tk.LEFT, padx=5) - - ttk.Button( - button_frame, - text="Clean Temp Files", - command=self.clean_temp_files - ).pack(side=tk.LEFT, padx=5) - - ttk.Button( - button_frame, - text="Clear Status", - command=self.clear_status - ).pack(side=tk.RIGHT, padx=5) + # Include top checkbox with Qt6 state + self.top_checkbox = QCheckBox("Include Top in Mesh") + self.top_checkbox.setCheckState(Qt.CheckState.Unchecked) + self.top_checkbox.stateChanged.connect(self.on_include_top_changed) + layout.addWidget(self.top_checkbox) # Status section - status_frame = ttk.LabelFrame(parent, text="Status", padding="5") - status_frame.pack(fill=tk.BOTH, expand=True, pady=5) - - # Add scrollbar to status - status_scroll = ttk.Scrollbar(status_frame) - status_scroll.pack(side=tk.RIGHT, fill=tk.Y) - - self.status_text = tk.Text( - status_frame, - height=10, - wrap=tk.WORD, - yscrollcommand=status_scroll.set - ) - self.status_text.pack(fill=tk.BOTH, expand=True) - status_scroll.config(command=self.status_text.yview) + self.status_group, self.status_text = create_group_with_text("Status") + layout.addWidget(self.status_group) - # Add progress bar - progress_frame = ttk.LabelFrame(parent, text="Progress", padding="5") - progress_frame.pack(fill=tk.X, pady=5) + # Progress section + progress_group = self.create_progress_group() + layout.addWidget(progress_group) - self.progress_bar = ttk.Progressbar( - progress_frame, - mode='indeterminate', - variable=self.progress_var - ) - self.progress_bar.pack(fill=tk.X, padx=5, pady=5) - - # Add current operation label - self.operation_label = ttk.Label(progress_frame, text="") - self.operation_label.pack(pady=5) - - # Add cancel button (initially disabled) - self.cancel_button = ttk.Button( - progress_frame, - text="Cancel Operation", - command=self.cancel_operation, - state='disabled' - ) - self.cancel_button.pack(pady=5) - - # Add separator before clean buttons - ttk.Separator(button_frame, orient='horizontal').pack(fill=tk.X, pady=5) - - # Create frame for clean buttons - clean_frame = ttk.Frame(button_frame) - clean_frame.pack(fill=tk.X, pady=5) - - # Add clean output button - ttk.Button( - clean_frame, - text="Clean Output Directory", - command=self.clean_output_directory, - style='CleanOutput.TButton' # Custom style for warning-like button but less severe - ).pack(side=tk.LEFT, padx=5) - - # Add clean all files button - ttk.Button( - clean_frame, - text="Clean All Files", - command=self.clean_all_files, - style='CleanAll.TButton' # Custom style for warning-like button - ).pack(side=tk.LEFT, padx=5) - - # Create custom styles for clean buttons - style = ttk.Style() - style.configure('CleanOutput.TButton', foreground='orange') # Less severe warning color - style.configure('CleanAll.TButton', foreground='red') # Severe warning color - - def run_process_with_output(self, cmd, description): - """Run a process and capture output in real-time""" - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - text=True, - bufsize=1, - universal_newlines=True - ) - - # Function to read output stream - def read_output(stream, queue): - for line in iter(stream.readline, ''): - queue.put(line) - stream.close() - - # Start output reader threads - stdout_thread = threading.Thread( - target=read_output, - args=(process.stdout, self.output_queue) - ) - stderr_thread = threading.Thread( - target=read_output, - args=(process.stderr, self.output_queue) + # Control buttons + button_layout = self.create_control_buttons() + layout.addLayout(button_layout) + + widget.setLayout(layout) + return widget + + def create_input_files_group(self): + """Create input files selection group""" + group = QWidget() + layout = QVBoxLayout() + + # File selection buttons + for file_type in self.manual_inputs.keys(): + row = QHBoxLayout() + row.addWidget(QLabel(f"{file_type}:")) + label = QLabel("Using default") + setattr(self, f"{file_type.split('.')[0]}_label", label) + row.addWidget(label) + + btn = QPushButton("Select") + btn.clicked.connect(lambda checked, ft=file_type: self.select_input_file(ft)) + row.addWidget(btn) + + layout.addLayout(row) + + # Reset button + reset_btn = QPushButton("Reset to Default Inputs") + reset_btn.clicked.connect(self.reset_inputs) + layout.addWidget(reset_btn) + + group.setLayout(layout) + return group + + def create_progress_group(self): + """Create progress indicators group""" + group = QWidget() + layout = QVBoxLayout() + + # Status indicators + self.edge_status = create_status_label("EdgeNet:") + self.split_status = create_status_label("Mesh Split:") + self.flip_status = create_status_label("Blender Flip:") + + layout.addLayout(self.edge_status) + layout.addLayout(self.split_status) + layout.addLayout(self.flip_status) + + # Progress bar + self.progress_bar = QProgressBar() + layout.addWidget(self.progress_bar) + + # Operation label + self.operation_label = QLabel() + layout.addWidget(self.operation_label) + + group.setLayout(layout) + return group + + def create_right_panel(self): + """Create right preview panel""" + widget = QWidget() + layout = QVBoxLayout() + + # Preview groups + depth_group, self.depth_preview = create_preview_group("Enhanced Depth") + mesh_group, self.mesh_preview = create_preview_group("Generated Mesh") + + layout.addWidget(depth_group) + layout.addWidget(mesh_group) + + widget.setLayout(layout) + return widget + + def create_control_buttons(self): + """Create control buttons layout""" + return create_button_layout( + ("Run EdgeNet", self.run_edge_net, 'left'), + ("Run Mesh Split", self.run_mesh_split, 'left'), + ("Run Blender Flip", self.run_blender_flip, 'left'), + ("Run All Steps", self.run_all_steps, 'left'), + ("Clean Output", self.clean_output_directory, 'right'), + ("Clean All", self.clean_all_files, 'right') ) - stdout_thread.daemon = True - stderr_thread.daemon = True - stdout_thread.start() - stderr_thread.start() - - # Wait for process to complete - return_code = process.wait() - - # Wait for output threads to finish - stdout_thread.join() - stderr_thread.join() - - return return_code - - def monitor_output(self): - """Monitor and display process output""" - try: - while True: - try: - line = self.output_queue.get_nowait() - self.update_status(line.strip()) - self.status_text.see(tk.END) - except queue.Empty: - break - finally: - # Schedule next check - if self.is_processing: - self.tab.after(100, self.monitor_output) - - def setup_preview_panel(self, parent): - preview_frame = ttk.LabelFrame(parent, text="Process Previews", padding="5") - preview_frame.pack(fill=tk.BOTH, expand=True) - - # Enhanced depth preview - depth_frame = ttk.LabelFrame(preview_frame, text="Enhanced Depth") - depth_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) - - self.depth_preview = ttk.Label(depth_frame) - self.depth_preview.pack(padx=5, pady=5) - - # Mesh preview - mesh_frame = ttk.LabelFrame(preview_frame, text="Generated Mesh") - mesh_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) - - self.mesh_preview = ttk.Label(mesh_frame) - self.mesh_preview.pack(padx=5, pady=5) + def connect_signals(self): + """Connect signals and slots""" + # Add any additional signal connections here + pass def select_input_file(self, file_type): - """Select a manual input file""" - filepath = filedialog.askopenfilename( - filetypes=[("PNG files", "*.png")] - ) - if filepath: - self.manual_inputs[file_type] = os.path.normpath(filepath) - # Update corresponding label - if file_type == "depth_e.png": - self.depth_label.config(text=os.path.basename(filepath)) - elif file_type == "rgb.png": - self.rgb_label.config(text=os.path.basename(filepath)) - elif file_type == "material.png": - self.material_label.config(text=os.path.basename(filepath)) + """Handle input file selection""" + file_path = select_file(self, "Select Input File", "Image Files (*.png)") + if file_path: + self.manual_inputs[file_type] = file_path + label = getattr(self, f"{file_type.split('.')[0]}_label") + label.setText(os.path.basename(file_path)) + self.update_status(f"Selected {file_type}: {file_path}") - self.update_status(f"Selected {file_type}: {filepath}") - def reset_inputs(self): - """Reset all inputs to default""" - self.manual_inputs = { - 'depth_e.png': None, - 'rgb.png': None, - 'material.png': None - } - self.depth_label.config(text="Using default") - self.rgb_label.config(text="Using default") - self.material_label.config(text="Using default") + """Reset input selections to default""" + self.manual_inputs = {k: None for k in self.manual_inputs} + for file_type in self.manual_inputs: + label = getattr(self, f"{file_type.split('.')[0]}_label") + label.setText("Using default") self.update_status("Reset all inputs to default") - - def get_input_path(self, file_type): - """Get the path for an input file, considering manual inputs""" - if self.manual_inputs[file_type]: - return self.manual_inputs[file_type] - return os.path.join(self.input_dir, file_type) # Using updated input_dir - - def copy_input_files(self): - """Copy manual input files to Input directory if needed""" - os.makedirs(self.input_dir, exist_ok=True) - for file_type, manual_path in self.manual_inputs.items(): - if manual_path: - dest_path = os.path.join(self.input_dir, file_type) - try: - shutil.copy2(manual_path, dest_path) - self.update_status(f"Copied {file_type} to Input directory") - except Exception as e: - self.update_status(f"Error copying {file_type}: {str(e)}") - - def verify_enhance360_inputs(self): - """Verify input files for enhance360.py""" - required_files = { - 'depth_e.png': self.get_input_path('depth_e.png'), - 'rgb.png': self.get_input_path('rgb.png') - } + def run_edge_net(self): + """Run EdgeNet processing""" + if self.is_processing: + QMessageBox.warning(self, "Warning", "A process is already running!") + return + + self.is_processing = True + self.progress_bar.setMaximum(0) - self.update_status("Checking input files in: " + self.input_dir) + self.current_thread = ProcessThread(self._run_edge_net_process) + self.current_thread.finished.connect(self.on_edge_net_complete) + self.current_thread.progress.connect(self.update_status) + self.current_thread.start() - missing_files = [] - for name, path in required_files.items(): - if not os.path.exists(path): - missing_files.append(f"{name} ({path})") - - if missing_files: - self.update_status("Missing required files for enhance360.py:") - for file in missing_files: - self.update_status(f"- {file}") - return False + def clean_output_directory(self): + if show_confirmation_dialog(self, "Confirm Clean", "Clean output directory?"): + if clean_directory(self.output_dir, self.update_status): + self.update_status("Output directory cleaned") + update_status_indicator(self.split_status, "Not started") + update_status_indicator(self.flip_status, "Not started") + clear_previews(self.mesh_preview) + + def clean_all_files(self): + if show_confirmation_dialog(self, "Confirm Clean", "Clean all files?"): + directories = [self.input_dir, self.output_dir] + for directory in directories: + clean_directory(directory, self.update_status) - self.update_status("All required files present for enhance360.py") - return True + update_status_indicator(self.edge_status, "Not started") + update_status_indicator(self.split_status, "Not started") + update_status_indicator(self.flip_status, "Not started") + clear_previews(self.depth_preview, self.mesh_preview) - def verify_infer360_inputs(self): - """Verify input files for infer360.py""" + def update_status(self, message): + """Update status text""" + self.status_text.append(message) + + def on_include_top_changed(self, state): + """Handle include top checkbox change""" + self.include_top = state == Qt.CheckState.Checked + + def verify_inputs(self): + """Verify input files exist""" required_files = { - 'enhanced_depth_e.png': os.path.join(self.input_dir, "enhanced_depth_e.png"), - 'material.png': os.path.join(self.input_dir, "material.png"), - 'rgb.png': os.path.join(self.input_dir, "rgb.png") + 'depth_e.png': self.get_input_path('depth_e.png'), + 'rgb.png': self.get_input_path('rgb.png') } - missing_files = [] - for name, path in required_files.items(): - if not os.path.exists(path): - missing_files.append(f"{name} ({path})") - - if missing_files: - self.update_status("Missing required files for infer360.py:") - for file in missing_files: - self.update_status(f"- {file}") + missing = [f for f, p in required_files.items() if not os.path.exists(p)] + if missing: + self.update_status("Missing required files: " + ", ".join(missing)) return False - - self.update_status("All required files present for infer360.py") return True - - def run_edge_net(self): - if self.is_processing: - messagebox.showwarning("Warning", "A process is already running!") - return - - # Start processing in a separate thread - thread = threading.Thread(target=self._run_edge_net_thread) - thread.daemon = True - thread.start() - # Start progress bar - self.progress_bar.start(10) - self.is_processing = True - self.cancel_button.config(state='normal') + def get_input_path(self, file_type): + """Get path for input file""" + return self.manual_inputs.get(file_type) or os.path.join(self.input_dir, file_type) - # Start output monitoring - self.monitor_output() - - def _run_edge_net_thread(self): - """Run EdgeNet processing in a separate thread""" + def _run_edge_net_process(self): + """Run EdgeNet processing""" + # Change to EdgeNet directory + os.chdir(self.edge_net_dir) try: - # Verify input files first - self.operation_label.config(text="Verifying input files...") - if not self.verify_enhance360_inputs(): - self.update_status("Attempting to copy manual inputs...") + if not self.verify_inputs(): self.copy_input_files() - - if not self.verify_enhance360_inputs(): - raise Exception("Missing required input files for enhance360.py") + if not self.verify_inputs(): + raise Exception("Missing required input files") + + # Run enhance360.py + enhance_cmd = self._build_enhance_command() + if run_command(self, enhance_cmd, self.update_status)[0]: + # Run infer360.py + infer_cmd = self._build_infer_command() + return run_command(self, infer_cmd, self.update_status)[0] + return False - original_dir = os.getcwd() - os.chdir(self.edge_net_dir) + except Exception as e: + self.update_status(f"Error: {str(e)}") + return False - try: - # Run enhance360.py - self.operation_label.config(text="Running enhance360.py...") - enhance_cmd = (f'wsl bash -c "source {self.config_reader.config["wslAnacondaDir"]}/activate' - f' {self.config_reader.config["edgeNetEnv"]} && ' - f'python enhance360.py Input depth_e.png rgb.png enhanced_depth_e.png"') - - self.update_status(f"Executing command:\n{enhance_cmd}") - - if self.run_process_with_output(enhance_cmd, "enhance360.py") != 0: - raise Exception("enhance360.py failed") - - # Verify files for infer360.py - self.operation_label.config(text="Verifying infer360.py inputs...") - if not self.verify_infer360_inputs(): - raise Exception("Missing required input files for infer360.py") - - # Run infer360.py - self.operation_label.config(text="Running infer360.py...") - include_top_flag = "--include_top y" if self.include_top.get() else "" - infer_cmd = (f'wsl bash -c "source {self.config_reader.config["wslAnacondaDir"]}/activate' - f' {self.config_reader.config["edgeNetEnv"]} && ' - f'python infer360.py Input enhanced_depth_e.png material.png rgb.png Input {include_top_flag}"') - - self.update_status(f"Executing command:\n{infer_cmd}") + def _build_enhance_command(self): + """Build enhance360.py command""" + return (f'wsl bash -c "source {self.config_reader.config["wslAnacondaDir"]}/activate' + f' {self.config_reader.config["edgeNetEnv"]} && ' + f'python enhance360.py Input depth_e.png rgb.png enhanced_depth_e.png"') - if self.run_process_with_output(infer_cmd, "infer360.py") != 0: - raise Exception("infer360.py failed") + def _build_infer_command(self): + """Build infer360.py command""" + include_top = "--include_top y" if self.include_top else "" + return (f'wsl bash -c "source {self.config_reader.config["wslAnacondaDir"]}/activate' + f' {self.config_reader.config["edgeNetEnv"]} && ' + f'python infer360.py Input enhanced_depth_e.png material.png rgb.png Input {include_top}"') - finally: - os.chdir(original_dir) - # Deactivate WSL environment - subprocess.run('wsl bash -c "conda deactivate"', shell=True) - - self.tab.after(0, self._process_complete, True) - - except Exception as e: - self.tab.after(0, self._process_complete, False, str(e)) - - finally: - self.is_processing = False - self.update_depth_preview() - - def _process_complete(self, success, error_message=None): - """Handle process completion in the main thread""" - self.progress_bar.stop() - self.cancel_button.config(state='disabled') - self.operation_label.config(text="") + def on_edge_net_complete(self, success, error_message): + """Handle EdgeNet completion""" + self.is_processing = False + self.progress_bar.setMaximum(100) if success: - self.update_status("EdgeNet processing completed successfully") - self.edge_status.config(text="✓ Complete", foreground="green") + update_status_indicator(self.edge_status, "Complete") self.update_depth_preview() else: - self.update_status(f"Error in EdgeNet: {error_message}") - self.edge_status.config(text="✗ Failed", foreground="red") - messagebox.showerror("Error", f"EdgeNet failed: {error_message}") - - def cancel_operation(self): - """Attempt to cancel the current operation""" - if not self.is_processing: - return + update_status_indicator(self.edge_status, "Failed") + QMessageBox.critical(self, "Error", f"EdgeNet failed: {error_message}") - if messagebox.askyesno("Confirm Cancel", "Are you sure you want to cancel the current operation?"): - self.update_status("Attempting to cancel operation...") - # TODO: Implement actual process cancellation - # This might require storing process handles and sending signals - self.is_processing = False - - def update_status(self, message): - """Thread-safe status update""" - self.tab.after(0, self._update_status_safe, message) + def update_depth_preview(self): + """Update depth preview image""" + depth_path = os.path.join(self.input_dir, "enhanced_depth_e.png") + update_preview(self.depth_preview, depth_path, 300) - def _update_status_safe(self, message): - """Update status text in the main thread""" - self.status_text.insert(tk.END, f"{message}\n") - self.status_text.see(tk.END) - def run_mesh_split(self): if self.is_processing: - messagebox.showwarning("Warning", "A process is already running!") + QMessageBox.warning(self, "Warning", "A process is already running!") return - # Start processing in a separate thread - thread = threading.Thread(target=self._run_mesh_split_thread) - thread.daemon = True - thread.start() - - # Start progress bar - self.progress_bar.start(10) self.is_processing = True - self.cancel_button.config(state='normal') - self.monitor_output() + self.progress_bar.setMaximum(0) + + self.current_thread = ProcessThread(self._run_mesh_split_process) + self.current_thread.finished.connect(self.on_mesh_split_complete) + self.current_thread.progress.connect(self.update_status) + self.current_thread.start() - def _run_mesh_split_thread(self): - """Run mesh splitting in a separate thread""" + def _run_mesh_split_process(self): + """Execute mesh splitting process""" + # Change to EdgeNet directory + os.chdir(self.edge_net_dir) try: - # Verify input files first - if not self.verify_mesh_split_inputs(): - raise Exception("Missing required input files for mesh splitting") + if not os.path.exists(os.path.join(self.output_dir, "Input_prediction.obj")): + raise Exception("Missing Input_prediction.obj file") - self.update_status("Running mesh splitting...") - self.split_status.config(text="Running...", foreground="orange") - - original_dir = os.getcwd() - os.chdir(self.edge_net_dir) + cmd = (f'call "{self.config_reader.config["condaDir"]}\\Scripts\\activate.bat" ' + f'{self.config_reader.config["materialEnv"]} && ' + f'python replace.py') + + return run_command(self, cmd, self.update_status)[0] - try: - # Construct and run command - cmd = (f'call "{self.config_reader.config["condaDir"]}\\Scripts\\activate.bat" ' - f'{self.config_reader.config["materialEnv"]} && ' - f'python replace.py') - - self.update_status(f"Executing command:\n{cmd}") - - if self.run_process_with_output(cmd, "Mesh Splitting") != 0: - raise Exception("Mesh splitting process failed") - - # Verify output files were created - output_obj = os.path.join(self.output_dir, "Input_prediction_mesh.obj") - - if not (os.path.exists(output_obj)): - raise Exception("Mesh splitting did not generate expected output files") - - self.tab.after(0, self._split_complete, True) - - finally: - os.chdir(original_dir) - except Exception as e: - self.tab.after(0, self._split_complete, False, str(e)) - - finally: - self.is_processing = False + self.update_status(f"Error: {str(e)}") + return False - def _split_complete(self, success, error_message=None): - """Handle mesh split completion in the main thread""" - self.progress_bar.stop() - self.cancel_button.config(state='disabled') - self.operation_label.config(text="") + def on_mesh_split_complete(self, success, error_message): + """Handle mesh split completion""" + self.is_processing = False + self.progress_bar.setMaximum(100) if success: - self.update_status("Mesh splitting completed successfully") - self.split_status.config(text="✓ Complete", foreground="green") + update_status_indicator(self.split_status, "Complete") + self.update_mesh_preview() else: - self.update_status(f"Error in mesh splitting: {error_message}") - self.split_status.config(text="✗ Failed", foreground="red") - messagebox.showerror("Error", f"Mesh splitting failed: {error_message}") + update_status_indicator(self.split_status, "Failed") + QMessageBox.critical(self, "Error", f"Mesh splitting failed: {error_message}") def run_blender_flip(self): if self.is_processing: - messagebox.showwarning("Warning", "A process is already running!") + QMessageBox.warning(self, "Warning", "A process is already running!") return - # Start processing in a separate thread - thread = threading.Thread(target=self._run_blender_flip_thread) - thread.daemon = True - thread.start() - - # Start progress bar - self.progress_bar.start(10) self.is_processing = True - self.cancel_button.config(state='normal') - self.monitor_output() + self.progress_bar.setMaximum(0) + + self.current_thread = ProcessThread(self._run_blender_flip_process) + self.current_thread.finished.connect(self.on_blender_flip_complete) + self.current_thread.progress.connect(self.update_status) + self.current_thread.start() - def _run_blender_flip_thread(self): - """Run blender flip in a separate thread""" + def _run_blender_flip_process(self): + """Execute Blender flip process""" + # Change to scripts directory + os.chdir(self.config_reader.directories['scriptDir']) try: - # Verify input files first - if not self.verify_blender_flip_inputs(): - raise Exception("Missing required input files for Blender flip") + mesh_path = os.path.join(self.output_dir, "Input_prediction_mesh.obj") + if not os.path.exists(mesh_path): + raise Exception("Missing Input_prediction_mesh.obj file") - self.update_status("Running Blender flip...") - self.flip_status.config(text="Running...", foreground="orange") - - original_dir = os.getcwd() - os.chdir(self.config_reader.directories['scriptDir']) + cmd = (f'call "{self.config_reader.config["condaDir"]}\\Scripts\\activate.bat" ' + f'{self.config_reader.config["unityEnv"]} && ' + f'python blenderFlip.py "{mesh_path}"') + + return run_command(self, cmd, self.update_status)[0] - try: - # Construct and run command - cmd = (f'call "{self.config_reader.config["condaDir"]}\\Scripts\\activate.bat" ' - f'{self.config_reader.config["unityEnv"]} && ' - f'python blenderFlip.py ' - f'"{os.path.join(self.output_dir, "Input_prediction_mesh.obj")}"') - - self.update_status(f"Executing command:\n{cmd}") - - if self.run_process_with_output(cmd, "Blender Flip") != 0: - raise Exception("Blender flip process failed") - - # Verify output files were created - output_obj = os.path.join(self.output_dir, "final_output_scene_mesh.obj") - - if not (os.path.exists(output_obj)): - raise Exception("Blender flip did not generate expected output files") - - self.tab.after(0, self._flip_complete, True) - - finally: - os.chdir(original_dir) - except Exception as e: - self.tab.after(0, self._flip_complete, False, str(e)) - - finally: - self.is_processing = False + self.update_status(f"Error: {str(e)}") + return False - def _flip_complete(self, success, error_message=None): - """Handle blender flip completion in the main thread""" - self.progress_bar.stop() - self.cancel_button.config(state='disabled') - self.operation_label.config(text="") + def on_blender_flip_complete(self, success, error_message): + """Handle Blender flip completion""" + self.is_processing = False + self.progress_bar.setMaximum(100) if success: - self.update_status("Blender flip completed successfully") - self.flip_status.config(text="✓ Complete", foreground="green") + update_status_indicator(self.flip_status, "Complete") + self.update_mesh_preview() else: - self.update_status(f"Error in Blender flip: {error_message}") - self.flip_status.config(text="✗ Failed", foreground="red") - messagebox.showerror("Error", f"Blender flip failed: {error_message}") + update_status_indicator(self.flip_status, "Failed") + QMessageBox.critical(self, "Error", f"Blender flip failed: {error_message}") - def verify_mesh_split_inputs(self): - """Verify input files for mesh splitting""" - required_files = { - 'input_prediction.obj': os.path.join(self.output_dir, "Input_prediction.obj") - } - - missing_files = [] - for name, path in required_files.items(): - if not os.path.exists(path): - missing_files.append(f"{name} ({path})") - - if missing_files: - self.update_status("Missing required files for mesh splitting:") - for file in missing_files: - self.update_status(f"- {file}") - return False - - self.update_status("All required files present for mesh splitting") - return True - - def verify_blender_flip_inputs(self): - """Verify input files for Blender flip""" - required_files = { - 'Input_prediction_mesh.obj': os.path.join(self.output_dir, "Input_prediction_mesh.obj") - } - - missing_files = [] - for name, path in required_files.items(): - if not os.path.exists(path): - missing_files.append(f"{name} ({path})") - - if missing_files: - self.update_status("Missing required files for Blender flip:") - for file in missing_files: - self.update_status(f"- {file}") - return False - - self.update_status("All required files present for Blender flip") - return True - def run_all_steps(self): - """Run all EdgeNet processing steps in sequence""" if self.is_processing: - messagebox.showwarning("Warning", "A process is already running!") + QMessageBox.warning(self, "Warning", "A process is already running!") return - # Start processing in a separate thread - thread = threading.Thread(target=self._run_all_steps_thread) - thread.daemon = True - thread.start() + self.current_thread = ProcessThread(self._run_all_steps_process) + self.current_thread.finished.connect(self.on_all_steps_complete) + self.current_thread.progress.connect(self.update_status) + self.current_thread.start() - def _run_all_steps_thread(self): - """Run all steps sequentially in a thread""" - try: - self.update_status("\nStarting complete pipeline processing...") + def cancel_operation(self): + if not self.is_processing: + return - # Run EdgeNet and wait for completion - self.update_status("\nStep 1: Running EdgeNet...") - self.run_edge_net() - while self.is_processing: - time.sleep(0.1) - - # Check EdgeNet completion status - if self.edge_status.cget("text") == "✓ Complete": - # Run Mesh Split and wait for completion - self.update_status("\nStep 2: Running Mesh Split...") - self.run_mesh_split() - while self.is_processing: - time.sleep(0.1) - - # Check Mesh Split completion status - if self.split_status.cget("text") == "✓ Complete": - # Run Blender Flip and wait for completion - self.update_status("\nStep 3: Running Blender Flip...") - self.run_blender_flip() - while self.is_processing: - time.sleep(0.1) - - # Final check - if self.flip_status.cget("text") == "✓ Complete": - self.update_status("\nComplete pipeline processing finished successfully!") - self.tab.after(0, messagebox.showinfo, "Success", - "All processing steps completed successfully!") - - except Exception as e: - error_msg = f"Error in processing pipeline: {str(e)}" - self.update_status(f"\n{error_msg}") - self.tab.after(0, messagebox.showerror, "Error", error_msg) + if show_confirmation_dialog(self, "Confirm Cancel", "Cancel current operation?"): + self.update_status("Cancelling operation...") + if self.current_thread: + self.current_thread.stop() + self.is_processing = False - def clean_temp_files(self): - """Clean up temporary files""" + def _run_all_steps_process(self): + """Execute complete pipeline process""" try: - temp_files = [ - self.config_reader.file_paths['shiftedImage'], - self.config_reader.file_paths['monoDepthImage'] - ] + self.update_status("Starting complete pipeline processing...") - for file_path in temp_files: - if os.path.exists(file_path): - os.remove(file_path) - self.update_status(f"Removed temporary file: {file_path}") - - self.update_status("Temporary files cleaned up") + # Run EdgeNet + self.update_status("Running EdgeNet...") + update_status_indicator(self.edge_status, "Running") + if not self._run_edge_net_process(): + raise Exception("EdgeNet processing failed") + + # Run Mesh Split + self.update_status("Running Mesh Split...") + update_status_indicator(self.split_status, "Running") + if not self._run_mesh_split_process(): + raise Exception("Mesh splitting failed") + + # Run Blender Flip + self.update_status("Running Blender Flip...") + update_status_indicator(self.flip_status, "Running") + if not self._run_blender_flip_process(): + raise Exception("Blender flip failed") + + return True except Exception as e: - self.update_status(f"Error cleaning temporary files: {str(e)}") - messagebox.showerror("Error", f"Failed to clean temporary files: {str(e)}") - - def update_status(self, message): - self.status_text.insert(tk.END, f"{message}\n") - self.status_text.see(tk.END) - - def clear_status(self): - self.status_text.delete(1.0, tk.END) + self.update_status(f"Pipeline error: {str(e)}") + return False - def update_depth_preview(self): - """Update the enhanced depth preview""" - preview_size = (300, 150) # Adjust size as needed - depth_path = os.path.join(self.input_dir, "enhanced_depth_e.png") + def on_all_steps_complete(self, success, error_message): + """Handle complete pipeline completion""" + self.is_processing = False + self.progress_bar.setMaximum(100) - if os.path.exists(depth_path): - try: - depth_img = Image.open(depth_path) - depth_img.thumbnail(preview_size) - depth_photo = ImageTk.PhotoImage(depth_img) - self.depth_preview.configure(image=depth_photo) - self.depth_preview.image = depth_photo - except Exception as e: - self.update_status(f"Failed to load depth preview: {str(e)}") + if success: + self.update_status("Complete pipeline processing finished successfully!") + QMessageBox.information(self, "Success", + "All processing steps completed successfully!") + else: + self.update_status(f"Pipeline failed: {error_message}") + QMessageBox.critical(self, "Error", f"Pipeline failed: {error_message}") def update_mesh_preview(self): - """Update the mesh preview""" - # Note: This would require additional implementation to render mesh preview - # Could potentially use a screenshot or placeholder image - self.update_status("Mesh preview not implemented") - - def clean_all_files(self): - """Clean all output and intermediary files""" - # Create detailed message of what will be cleaned - clean_message = "This will delete all files in:\n\n" - clean_message += "1. edgenet-360/Data/Input/\n" - clean_message += "2. edgenet-360/Output/\n" - clean_message += "3. Temporary files\n\n" - clean_message += "Are you sure you want to proceed?" - - # Show confirmation dialog - if not messagebox.askyesno("Confirm Clean", clean_message, icon='warning'): - self.update_status("Clean operation cancelled by user") - return - - try: - # List of directories to clean - directories = [ - os.path.join(self.edge_net_dir, "Data", "Input"), - self.output_dir - ] - - # List of temp files to remove - temp_files = [ - self.config_reader.file_paths['shiftedImage'], - self.config_reader.file_paths['monoDepthImage'], - os.path.join(self.edge_net_dir, "process_output.log") # WSL output log if exists - ] - - files_removed = 0 - - # Clean directories - for directory in directories: - if os.path.exists(directory): - try: - # List all files before deleting for reporting - files = os.listdir(directory) - self.update_status(f"\nCleaning directory: {directory}") - for file in files: - file_path = os.path.join(directory, file) - try: - if os.path.isfile(file_path): - os.remove(file_path) - self.update_status(f"Removed: {file}") - files_removed += 1 - elif os.path.isdir(file_path): - shutil.rmtree(file_path) - self.update_status(f"Removed directory: {file}") - files_removed += 1 - except Exception as e: - self.update_status(f"Error removing {file}: {str(e)}") - except Exception as e: - self.update_status(f"Error cleaning directory {directory}: {str(e)}") - else: - self.update_status(f"Directory does not exist: {directory}") - - # Clean temp files - for temp_file in temp_files: - if os.path.exists(temp_file): - try: - os.remove(temp_file) - self.update_status(f"Removed temp file: {temp_file}") - files_removed += 1 - except Exception as e: - self.update_status(f"Error removing temp file {temp_file}: {str(e)}") - - # Final report - if files_removed > 0: - self.update_status(f"\nCleaning complete. Removed {files_removed} files/directories.") - messagebox.showinfo("Clean Complete", f"Successfully removed {files_removed} files/directories.") - else: - self.update_status("\nNo files found to clean.") - messagebox.showinfo("Clean Complete", "No files found to clean.") - - # Reset status indicators - self.edge_status.config(text="Not started") - self.split_status.config(text="Not started") - self.flip_status.config(text="Not started") - - # Clear previews - self.clear_previews() - - except Exception as e: - error_msg = f"Error during cleaning: {str(e)}" - self.update_status(error_msg) - messagebox.showerror("Error", error_msg) + """Update mesh preview image""" + # Implement mesh preview rendering + # This could involve taking a screenshot of the mesh + # or loading a pre-rendered preview image + pass - def clean_output_directory(self): - """Clean only the output directory""" - # Create detailed message of what will be cleaned - clean_message = "This will delete all files in:\n\n" - clean_message += "edgenet-360/Output/\n\n" - clean_message += "Are you sure you want to proceed?" - - # Show confirmation dialog - if not messagebox.askyesno("Confirm Clean Output", clean_message, icon='warning'): - self.update_status("Clean output operation cancelled by user") - return - - try: - files_removed = 0 - - if os.path.exists(self.output_dir): - try: - # List all files before deleting for reporting - files = os.listdir(self.output_dir) - - if not files: - self.update_status("\nOutput directory is already empty.") - messagebox.showinfo("Clean Output", "Output directory is already empty.") - return - - self.update_status(f"\nCleaning output directory: {self.output_dir}") - - for file in files: - file_path = os.path.join(self.output_dir, file) - try: - if os.path.isfile(file_path): - os.remove(file_path) - self.update_status(f"Removed: {file}") - files_removed += 1 - elif os.path.isdir(file_path): - shutil.rmtree(file_path) - self.update_status(f"Removed directory: {file}") - files_removed += 1 - except Exception as e: - self.update_status(f"Error removing {file}: {str(e)}") - - # Final report - self.update_status(f"\nOutput directory cleaning complete. Removed {files_removed} files/directories.") - messagebox.showinfo("Clean Output Complete", - f"Successfully removed {files_removed} files/directories from output directory.") - - # Reset relevant status indicators - self.split_status.config(text="Not started") # Reset status for operations that create output files - self.flip_status.config(text="Not started") - - # Clear mesh preview if it exists - self.mesh_preview.configure(image='') - - except Exception as e: - error_msg = f"Error cleaning output directory: {str(e)}" - self.update_status(error_msg) - messagebox.showerror("Error", error_msg) - else: - self.update_status("\nOutput directory does not exist.") - messagebox.showinfo("Clean Output", "Output directory does not exist.") - - except Exception as e: - error_msg = f"Error during output cleaning: {str(e)}" - self.update_status(error_msg) - messagebox.showerror("Error", error_msg) - - def clear_previews(self): - """Clear all preview images""" - try: - # Clear depth preview - self.depth_preview.configure(image='') - - # Clear mesh preview - self.mesh_preview.configure(image='') - - # Force update - self.tab.update_idletasks() - - except Exception as e: - self.update_status(f"Error clearing previews: {str(e)}") \ No newline at end of file + def copy_input_files(self): + """Copy manual input files to input directory""" + os.makedirs(self.input_dir, exist_ok=True) + + for file_type, path in self.manual_inputs.items(): + if path: + dest = os.path.join(self.input_dir, file_type) + copy_file(path, dest, self.update_status) diff --git a/scripts/debug_tool/utils/qt_widgets.py b/scripts/debug_tool/utils/qt_widgets.py index 6003e78..5014f16 100644 --- a/scripts/debug_tool/utils/qt_widgets.py +++ b/scripts/debug_tool/utils/qt_widgets.py @@ -1,6 +1,7 @@ # utils/qt_widgets.py from PyQt6.QtWidgets import (QTextEdit, QGroupBox, QVBoxLayout, - QHBoxLayout, QPushButton, QLabel) + QHBoxLayout, QPushButton, QLabel, + QMessageBox) from PyQt6.QtCore import Qt def create_group_with_text(title, height=100): @@ -182,4 +183,13 @@ def get_status_text(status_layout): str: Current status text """ text = status_layout.itemAt(1).widget().text() - return text.replace("✓ ", "").replace("✗ ", "") \ No newline at end of file + return text.replace("✓ ", "").replace("✗ ", "") + +def show_confirmation_dialog(parent, title, message): + """Show a confirmation dialog and return True if user clicks Yes""" + return QMessageBox.question( + parent, + title, + message, + QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No + ) == QMessageBox.StandardButton.Yes \ No newline at end of file -- GitLab