Skip to content
Snippets Groups Projects

GDP 4.2.10 - Adding Debug mode to GUI.py to run individual modules

Merged mhby1g21 requested to merge GDP-4.2.10 into master
1 file
+ 39
3
Compare changes
  • Side-by-side
  • Inline
+ 518
0
from PyQt6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QCheckBox, QProgressBar, QTextEdit,
QMessageBox, QFileDialog)
from PyQt6.QtCore import Qt, QThread, pyqtSignal, QTimer
from PyQt6.QtGui import QPixmap
import os
import threading
import queue
import time
import sys
from utils.file_handlers import (select_file, run_command, clean_directory,
copy_file)
from utils.image_handlers import update_preview, clear_previews
from utils.qt_widgets import (create_group_with_text, create_button_layout,
create_preview_group, create_status_label,
update_status_indicator, show_confirmation_dialog)
class ProcessThread(QThread):
finished = pyqtSignal(bool, str)
progress = pyqtSignal(str)
def __init__(self, func, *args, **kwargs):
super().__init__()
self.func = func
self.args = args
self.kwargs = kwargs
self.running = True
def run(self):
try:
if self.running:
result = self.func(*self.args, **self.kwargs)
self.finished.emit(True, "")
except Exception as e:
self.finished.emit(False, str(e))
def stop(self):
self.running = False
self.wait()
class EdgeNetTab(QWidget):
def __init__(self, config_reader):
super().__init__()
self.config_reader = config_reader
self.setup_paths()
self.init_variables()
self.init_ui()
self.connect_signals()
self.current_thread = None
def closeEvent(self, event):
"""Handle widget close event"""
if self.current_thread and self.current_thread.isRunning():
self.current_thread.stop()
self.current_thread.wait()
event.accept()
def setup_paths(self):
"""Initialize directory and file paths"""
self.edge_net_dir = self.config_reader.directories['edgeNetDir']
self.output_dir = self.config_reader.directories['outputDir']
self.input_dir = os.path.join(self.edge_net_dir, "Data", "Input")
def init_variables(self):
"""Initialize class variables"""
self.include_top = False
self.is_processing = False
self.manual_inputs = {
'depth_e.png': None,
'rgb.png': None,
'material.png': None
}
self.output_queue = queue.Queue()
def init_ui(self):
"""Initialize user interface"""
layout = QVBoxLayout()
# Split into left and right sections
hlayout = QHBoxLayout()
left_panel = self.create_left_panel()
right_panel = self.create_right_panel()
hlayout.addWidget(left_panel)
hlayout.addWidget(right_panel)
layout.addLayout(hlayout)
# Initialize progress bar to stopped state
self.progress_bar.setMaximum(100)
self.progress_bar.setValue(0)
self.setLayout(layout)
def create_left_panel(self):
"""Create left control panel"""
widget = QWidget()
layout = QVBoxLayout()
# Input files section
files_group = self.create_input_files_group()
layout.addWidget(files_group)
# Include top checkbox with Qt6 state
self.top_checkbox = QCheckBox("Include Top in Mesh")
self.top_checkbox.setCheckState(Qt.CheckState.Unchecked)
self.top_checkbox.stateChanged.connect(self.on_include_top_changed)
layout.addWidget(self.top_checkbox)
# Status section
self.status_group, self.status_text = create_group_with_text("Status")
layout.addWidget(self.status_group)
# Progress section
progress_group = self.create_progress_group()
layout.addWidget(progress_group)
# Control buttons
button_layout = self.create_control_buttons()
layout.addLayout(button_layout)
widget.setLayout(layout)
return widget
def create_input_files_group(self):
"""Create input files selection group"""
group = QWidget()
layout = QVBoxLayout()
# File selection buttons
for file_type in self.manual_inputs.keys():
row = QHBoxLayout()
row.addWidget(QLabel(f"{file_type}:"))
label = QLabel("Using default")
setattr(self, f"{file_type.split('.')[0]}_label", label)
row.addWidget(label)
btn = QPushButton("Select")
btn.clicked.connect(lambda checked, ft=file_type: self.select_input_file(ft))
row.addWidget(btn)
layout.addLayout(row)
# Reset button
reset_btn = QPushButton("Reset to Default Inputs")
reset_btn.clicked.connect(self.reset_inputs)
layout.addWidget(reset_btn)
group.setLayout(layout)
return group
def create_progress_group(self):
"""Create progress indicators group"""
group = QWidget()
layout = QVBoxLayout()
# Status indicators
self.edge_status = create_status_label("EdgeNet:")
self.split_status = create_status_label("Mesh Split:")
self.flip_status = create_status_label("Blender Flip:")
layout.addLayout(self.edge_status)
layout.addLayout(self.split_status)
layout.addLayout(self.flip_status)
# Progress bar
self.progress_bar = QProgressBar()
layout.addWidget(self.progress_bar)
# Operation label
self.operation_label = QLabel()
layout.addWidget(self.operation_label)
group.setLayout(layout)
return group
def create_right_panel(self):
"""Create right preview panel"""
widget = QWidget()
layout = QVBoxLayout()
# Preview groups
depth_group, self.depth_preview = create_preview_group("Enhanced Depth")
mesh_group, self.mesh_preview = create_preview_group("Generated Mesh")
layout.addWidget(depth_group)
layout.addWidget(mesh_group)
widget.setLayout(layout)
return widget
def create_control_buttons(self):
"""Create control buttons layout"""
return create_button_layout(
("Run EdgeNet", self.run_edge_net, 'left'),
("Run Mesh Split", self.run_mesh_split, 'left'),
("Run Blender Flip", self.run_blender_flip, 'left'),
("Run All Steps", self.run_all_steps, 'left'),
("Clean Output", self.clean_output_directory, 'right'),
("Clean All", self.clean_all_files, 'right')
)
def connect_signals(self):
"""Connect signals and slots"""
# Add any additional signal connections here
pass
def select_input_file(self, file_type):
"""Handle input file selection"""
file_path = select_file(self, "Select Input File", "Image Files (*.png)", initial_dir=self.input_dir)
if file_path:
self.manual_inputs[file_type] = file_path
label = getattr(self, f"{file_type.split('.')[0]}_label")
label.setText(os.path.basename(file_path))
self.update_status(f"Selected {file_type}: {file_path}")
def reset_inputs(self):
"""Reset input selections to default"""
self.manual_inputs = {k: None for k in self.manual_inputs}
for file_type in self.manual_inputs:
label = getattr(self, f"{file_type.split('.')[0]}_label")
label.setText("Using default")
self.update_status("Reset all inputs to default")
def run_edge_net(self):
"""Run EdgeNet processing"""
if self.is_processing:
QMessageBox.warning(self, "Warning", "A process is already running!")
return
self.is_processing = True
self.progress_bar.setMaximum(0)
self.current_thread = ProcessThread(self._run_edge_net_process)
self.current_thread.finished.connect(self.on_edge_net_complete)
self.current_thread.progress.connect(self.update_status)
self.current_thread.start()
def clean_output_directory(self):
if show_confirmation_dialog(self, "Confirm Clean", "Clean output directory?"):
if clean_directory(self.output_dir, self.update_status):
self.update_status("Output directory cleaned")
update_status_indicator(self.split_status, "Not started")
update_status_indicator(self.flip_status, "Not started")
clear_previews(self.mesh_preview)
def clean_all_files(self):
if show_confirmation_dialog(self, "Confirm Clean", "Clean all files?"):
directories = [self.input_dir, self.output_dir]
for directory in directories:
clean_directory(directory, self.update_status)
# remove monodepth image copied in 360monodepth if exists
if os.path.exists(os.path.join(self.config_reader.directories['monoDepthDir'], 'rgb.jpg')):
monodepth_image = os.path.join(self.config_reader.directories['monoDepthDir'], 'rgb.jpg')
os.remove(monodepth_image)
# remove shifted image from shifter if exists
if os.path.exists(os.path.join(self.config_reader.directories['scriptDir'], 'shifted_t.png')):
shifted_image = os.path.join(self.config_reader.directories['scriptDir'], 'shifted_t.png')
os.remove(shifted_image)
update_status_indicator(self.edge_status, "Not started")
update_status_indicator(self.split_status, "Not started")
update_status_indicator(self.flip_status, "Not started")
clear_previews(self.depth_preview, self.mesh_preview)
def update_status(self, message):
"""Update status text"""
self.status_text.append(message)
def on_include_top_changed(self, state):
"""Handle include top checkbox change"""
# In PyQt6, CheckState.Checked has value 2
self.include_top = (state == 2) # or state == Qt.CheckState.Checked
self.update_status(f"Include top changed to: {self.include_top}")
def verify_inputs(self):
"""Verify input files exist"""
required_files = {
'depth_e.png': self.get_input_path('depth_e.png'),
'rgb.png': self.get_input_path('rgb.png')
}
missing = [f for f, p in required_files.items() if not os.path.exists(p)]
if missing:
self.update_status("Missing required files: " + ", ".join(missing))
return False
return True
def get_input_path(self, file_type):
"""Get path for input file"""
return self.manual_inputs.get(file_type) or os.path.join(self.input_dir, file_type)
def _run_edge_net_process(self):
"""Run EdgeNet processing"""
# Change to EdgeNet directory
os.chdir(self.edge_net_dir)
try:
if not self.verify_inputs():
self.copy_input_files()
if not self.verify_inputs():
raise Exception("Missing required input files")
# Run enhance360.py
enhance_cmd = self._build_enhance_command()
if run_command(self, enhance_cmd, self.update_status)[0]:
# Run infer360.py
infer_cmd = self._build_infer_command()
return run_command(self, infer_cmd, self.update_status)[0]
return False
except Exception as e:
self.update_status(f"Error: {str(e)}")
return False
def _build_enhance_command(self):
"""Build enhance360.py command"""
return (f'wsl bash -c "source {self.config_reader.config["wslAnacondaDir"]}/activate'
f' {self.config_reader.config["edgeNetEnv"]} && '
f'python enhance360.py Input depth_e.png rgb.png enhanced_depth_e.png"')
def _build_infer_command(self):
"""Build infer360.py command"""
# Debug print to verify include_top state
self.update_status(f"Current include_top state: {self.include_top}")
base_cmd = (f'wsl bash -c "source {self.config_reader.config["wslAnacondaDir"]}/activate'
f' {self.config_reader.config["edgeNetEnv"]} && '
f'python infer360.py Input enhanced_depth_e.png material.png rgb.png Input')
if self.include_top:
command = base_cmd + ' --include_top y"'
else:
command = base_cmd + '"'
# Log final command
self.update_status(f"Final command: {command}")
return command
def on_edge_net_complete(self, success, error_message):
"""Handle EdgeNet completion"""
self.is_processing = False
self.progress_bar.setMaximum(100)
if success:
update_status_indicator(self.edge_status, "Complete")
self.update_depth_preview()
else:
update_status_indicator(self.edge_status, "Failed")
QMessageBox.critical(self, "Error", f"EdgeNet failed: {error_message}")
def update_depth_preview(self):
"""Update depth preview image"""
depth_path = os.path.join(self.input_dir, "enhanced_depth_e.png")
update_preview(self.depth_preview, depth_path, 300)
def run_mesh_split(self):
if self.is_processing:
QMessageBox.warning(self, "Warning", "A process is already running!")
return
self.is_processing = True
self.progress_bar.setMaximum(0)
self.current_thread = ProcessThread(self._run_mesh_split_process)
self.current_thread.finished.connect(self.on_mesh_split_complete)
self.current_thread.progress.connect(self.update_status)
self.current_thread.start()
def _run_mesh_split_process(self):
"""Execute mesh splitting process"""
# Change to EdgeNet directory
os.chdir(self.edge_net_dir)
try:
if not os.path.exists(os.path.join(self.output_dir, "Input_prediction.obj")):
raise Exception("Missing Input_prediction.obj file")
cmd = (f'call "{self.config_reader.config["condaDir"]}\\Scripts\\activate.bat" '
f'{self.config_reader.config["materialEnv"]} && '
f'python replace.py')
return run_command(self, cmd, self.update_status)[0]
except Exception as e:
self.update_status(f"Error: {str(e)}")
return False
def on_mesh_split_complete(self, success, error_message):
"""Handle mesh split completion"""
self.is_processing = False
self.progress_bar.setMaximum(100)
if success:
update_status_indicator(self.split_status, "Complete")
self.update_mesh_preview()
else:
update_status_indicator(self.split_status, "Failed")
QMessageBox.critical(self, "Error", f"Mesh splitting failed: {error_message}")
def run_blender_flip(self):
if self.is_processing:
QMessageBox.warning(self, "Warning", "A process is already running!")
return
self.is_processing = True
self.progress_bar.setMaximum(0)
self.current_thread = ProcessThread(self._run_blender_flip_process)
self.current_thread.finished.connect(self.on_blender_flip_complete)
self.current_thread.progress.connect(self.update_status)
self.current_thread.start()
def _run_blender_flip_process(self):
"""Execute Blender flip process"""
# Change to scripts directory
os.chdir(self.config_reader.directories['scriptDir'])
try:
mesh_path = os.path.join(self.output_dir, "Input_prediction_mesh.obj")
if not os.path.exists(mesh_path):
raise Exception("Missing Input_prediction_mesh.obj file")
cmd = (f'call "{self.config_reader.config["condaDir"]}\\Scripts\\activate.bat" '
f'{self.config_reader.config["unityEnv"]} && '
f'python blenderFlip.py "{mesh_path}"')
return run_command(self, cmd, self.update_status)[0]
except Exception as e:
self.update_status(f"Error: {str(e)}")
return False
def on_blender_flip_complete(self, success, error_message):
"""Handle Blender flip completion"""
self.is_processing = False
self.progress_bar.setMaximum(100)
if success:
update_status_indicator(self.flip_status, "Complete")
self.update_mesh_preview()
else:
update_status_indicator(self.flip_status, "Failed")
QMessageBox.critical(self, "Error", f"Blender flip failed: {error_message}")
def run_all_steps(self):
if self.is_processing:
QMessageBox.warning(self, "Warning", "A process is already running!")
return
self.is_processing = True
self.progress_bar.setMaximum(0) # Set to indeterminate mode
self.current_thread = ProcessThread(self._run_all_steps_process)
self.current_thread.finished.connect(self.on_all_steps_complete)
self.current_thread.progress.connect(self.update_status)
self.current_thread.start()
def _run_all_steps_process(self):
"""Execute complete pipeline process"""
try:
self.update_status("Starting complete pipeline processing...")
# Run EdgeNet
self.update_status("Running EdgeNet...")
update_status_indicator(self.edge_status, "Running")
if not self._run_edge_net_process():
raise Exception("EdgeNet processing failed")
# Run Mesh Split
self.update_status("Running Mesh Split...")
update_status_indicator(self.split_status, "Running")
if not self._run_mesh_split_process():
raise Exception("Mesh splitting failed")
# Run Blender Flip
self.update_status("Running Blender Flip...")
update_status_indicator(self.flip_status, "Running")
if not self._run_blender_flip_process():
raise Exception("Blender flip failed")
return True
except Exception as e:
self.update_status(f"Pipeline error: {str(e)}")
return False
def on_all_steps_complete(self, success, error_message):
"""Handle complete pipeline completion"""
self.is_processing = False
self.progress_bar.setMaximum(100)
self.progress_bar.setValue(0)
if success:
self.update_status("Complete pipeline processing finished successfully!")
update_status_indicator(self.edge_status, "Complete")
update_status_indicator(self.split_status, "Complete")
update_status_indicator(self.flip_status, "Complete")
QMessageBox.information(self, "Success",
"All processing steps completed successfully!")
else:
self.update_status(f"Pipeline failed: {error_message}")
QMessageBox.critical(self, "Error", f"Pipeline failed: {error_message}")
def update_mesh_preview(self):
"""Update mesh preview image"""
# Implement mesh preview rendering
# This could involve taking a screenshot of the mesh
# or loading a pre-rendered preview image
pass
def copy_input_files(self):
"""Copy manual input files to input directory"""
os.makedirs(self.input_dir, exist_ok=True)
for file_type, path in self.manual_inputs.items():
if path:
dest = os.path.join(self.input_dir, file_type)
copy_file(path, dest, self.update_status)
Loading