diff --git a/integration/Window.py b/integration/Window.py index 61f70dbc1ce55ac7fc39e66ca4aca88340aeeca6..03dca9c99f4e1c0db8763b23510ca49054948361 100644 --- a/integration/Window.py +++ b/integration/Window.py @@ -268,6 +268,10 @@ class Window: self.gesture_label = tk.Label(self.frame3, text=f"Gesture is :") self.gesture_label.config(font=("Arial", 12)) self.gesture_label.place(relx=0.1, rely=0.6, anchor='w') + self.gesture_predict = tk.Label(self.frame3, text="") + self.gesture_predict.config(font=("Arial", 12)) + self.gesture_predict.place(relx=0.2, rely=0.7, anchor='w') + self.a, self.b = self.load_Function() @@ -435,6 +439,18 @@ class Window: self.outer_EMG_Number.config(text=f"{emg_data[0]}") self.inner_EMG_Number.config(text=f"{emg_data[1]}") + data=[emg_data[0],emg_data[1]] + predictions = self.predict(data,self.a,self.b) + ges_predictions = None + if predictions is not None: + if predictions==-1: + ges_predictions="Hand Open" + if predictions==1: + ges_predictions="Hand Closed" + if predictions==0 : + ges_predictions="Unknown" + self.gesture_predict.config(text=f"{ges_predictions}") + @@ -482,6 +498,36 @@ class Window: return self.adc_values + def load_Function(self,filename='trained.txt'): + try: + with open(filename, 'r') as file: + lines = file.readlines() + if len(lines) < 2: + raise ValueError("File content is insufficient to read the vertical line parameters.") + + a = float(lines[0].strip()) + b = float(lines[1].strip()) + print(f"a is {a}, b is {b}") + + return a,b + + except FileNotFoundError: + raise FileNotFoundError(f"The file {filename} does not exist.") + except ValueError as e: + raise ValueError(f"Error reading the file: {e}") + + def predict(self, point,a,b): + """判断点是否在垂直线的左侧或右侧""" + x, y = point + # 计算点的y值与垂直线的y值比较 + line_y = a * x + b + if y < line_y: + return -1 # 点在垂直线的左侧 + elif y > line_y: + return 1 # 点在垂直线的右侧 + else: + return 0 # 点在垂直线上(可选) + class WelcomeWindow: def __init__(self, root): self.root = root @@ -567,14 +613,14 @@ class trainingInterface: def EMG_connect_HandOpen(self): self.arduino_EMG = serial.Serial('COM5', 9600, timeout=1) gesture = "handOpen" - self.start_countdown(2) + self.start_countdown(11) self.displayAndsaveDate() def handCloseButton(self): self.arduino_EMG = serial.Serial('COM5', 9600, timeout=1) gesture = "handOpen" - self.start_countdown_close(2) + self.start_countdown_close(11) self.displayAndsaveDate() @@ -595,7 +641,7 @@ class trainingInterface: self.savedDataOpen = [] for i in self.savingData: self.savedDataOpen.append(i) - print(self.savedDataOpen) + print(f"open: {self.savedDataOpen}") self.savingData.clear() self.EMG_disconnect() @@ -609,11 +655,10 @@ class trainingInterface: self.handCloseButton.config(text="Hand Close") self.startSave = False self.savedDataClose=[] - print(self.savingData) for i in self.savingData: self.savedDataClose.append(i) self.savingData.clear() - print(self.savedDataClose) + print(f"close:{self.savedDataClose}") self.EMG_disconnect() self.trainData() @@ -692,8 +737,18 @@ class trainingInterface: ''' def trainData(self): if (self.savedDataClose !=[])and (self.savedDataOpen!=[]): - train_Data=Train_Data() - train_Data.run(self.savedDataOpen, self.savedDataClose) + vertical_line = Algorithm(self.savedDataClose, self.savedDataOpen) + print(f"垂直线方程: y = {vertical_line.a}x + {vertical_line.b}") + + with open('trained.txt', 'w') as file: + file.write(f"{vertical_line.a}\n") + file.write(f"{vertical_line.b}\n") + + test_points = [[2, 5], [3, 3], [4, 1]] + for point in test_points: + position = vertical_line.predict(point) + print(f"点 {point} 在垂直线的 {'左侧' if position == -1 else '右侧' if position == 1 else '上面/下面'}") + return vertical_line def _decode(self, serial_data): serial_string = serial_data.decode(errors="ignore") @@ -719,103 +774,69 @@ class trainingInterface: return self.adc_values -class Train_Data: - def __init__(self, num_inputs=2, num_outputs=2, num_hiddens=10, lr=0.001, num_epochs=300, batch_size=2010): - self.num_inputs = num_inputs - self.num_outputs = num_outputs - self.num_hiddens = num_hiddens - self.lr = lr - self.num_epochs = num_epochs - self.batch_size = batch_size - - self.w1 = nn.Parameter(torch.randn(num_inputs, num_hiddens, requires_grad=True)) - self.b1 = nn.Parameter(torch.zeros(num_hiddens, requires_grad=True)) - self.w2 = nn.Parameter(torch.randn(num_hiddens, num_outputs, requires_grad=True)) - self.b2 = nn.Parameter(torch.zeros(num_outputs, requires_grad=True)) - self.params = [self.w1, self.b1, self.w2, self.b2] - - self.loss = nn.CrossEntropyLoss() - self.updater = torch.optim.Adam(self.params, lr=self.lr) +class Algorithm: + def __init__(self, list1, list2): + self.a, self.b = self.calculate_line_equation(list1, list2) - def relu(self, x): - return torch.max(x, torch.zeros_like(x)) + def calculate_average(self, lst): + """计算列表中点的平均坐标""" + n = len(lst) + if n == 0: + return (0, 0) + sum_x = sum(point[0] for point in lst) + sum_y = sum(point[1] for point in lst) + return (sum_x / n, sum_y / n) - def net(self, x): - H = self.relu(x @ self.w1 + self.b1) - return H @ self.w2 + self.b2 + def calculate_line_equation(self, list1, list2): + """计算垂直线方程 y = ax + b""" + avg1 = self.calculate_average(list1) + avg2 = self.calculate_average(list2) - def dataManage(self, savedDataOpen, savedDataClose): - # Combine the data and create labels - data = savedDataOpen + savedDataClose - labels = [0] * len(savedDataOpen) + [1] * len(savedDataClose) # 0 for HandOpen, 1 for HandClose + x1, y1 = avg1 + x2, y2 = avg2 - # Convert to tensor - data_tensor = torch.tensor(data, dtype=torch.float32) - labels_tensor = torch.tensor(labels, dtype=torch.long) + # 计算斜率 + if x1 == x2: + raise ValueError("垂直线的斜率是未定义的,因为两个点在同一垂直线上。") - # Create TensorDataset - dataset = TensorDataset(data_tensor, labels_tensor) - total_size = len(dataset) - train_size = int(0.7 * total_size) - test_size = total_size - train_size + slope = (y2 - y1) / (x2 - x1) - # Split the data into training and testing sets - train_dataset, test_dataset = random_split(dataset, [train_size, test_size]) + # 垂直线的斜率是原斜率的负倒数 + perpendicular_slope = -1 / slope - train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True) - test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False) + # 使用点斜式方程 y - y1 = m(x - x1) 转换为 y = ax + b 的形式 + a = perpendicular_slope + b = y1 - a * x1 - return train_loader, test_loader + return a, b + def predict(self, point): + """判断点是否在垂直线的左侧或右侧""" + x, y = point + # 计算点的y值与垂直线的y值比较 + line_y = self.a * x + self.b + if y < line_y: + return -1 # 点在垂直线的左侧 + elif y > line_y: + return 1 # 点在垂直线的右侧 + else: + return 0 # 点在垂直线上(可选) - def train(self,savedDataOpen,savedDataClose): - train_loader, test_loader = self.dataManage(savedDataOpen,savedDataClose) - - for epoch in range(self.num_epochs): - for X, y in train_loader: - y_hat = self.net(X) - l = self.loss(y_hat, y) - self.updater.zero_grad() - l.backward() - self.updater.step() - - with torch.no_grad(): - train_loss = sum(self.loss(self.net(X), y).item() for X, y in train_loader) / len(train_loader) - print(f'epoch {epoch + 1}, train loss {train_loss:.3f}') - - self.evaluate_accuracy(test_loader) - - def evaluate_accuracy(self, test_loader): - correct = 0 - total = 0 - with torch.no_grad(): - for X, y in test_loader: - y_hat = self.net(X) - _, predicted = torch.max(y_hat, 1) - total += y.size(0) - correct += (predicted == y).sum().item() - - accuracy = correct / total - print(f'Test Accuracy: {accuracy:.4f}') - - def save_model(self, file_path='trained_mlp.pkl'): - joblib.dump(self.net, file_path) - def load_model(self, file_path='trained_mlp.pkl'): - self.loaded_model = joblib.load(file_path) - return self.loaded_model +def main(): + # 示例输入 + list1 = [[1, 2], [2, 3], [3, 4]] + list2 = [[4, 5], [5, 6], [6, 7]] - def run(self,savedDataOpen,savedDataClose): - self.train(savedDataOpen,savedDataClose) - self.save_model() + vertical_line = Algorithm(list1, list2) - def predict(self, X): - self.load_model() # Ensure the model parameters are loaded - with torch.no_grad(): - y_hat = self.net(X) - _, predicted = torch.max(y_hat, 1) - return predicted + print(f"垂直线方程: y = {vertical_line.a}x + {vertical_line.b}") + # 测试点 + test_points = [[2, 5], [3, 3], [4, 1]] + for point in test_points: + position = vertical_line.test(point) + print(f"点 {point} 在垂直线的 {'左侧' if position == -1 else '右侧' if position == 1 else '上面/下面'}") diff --git a/integration/example.py b/integration/example.py new file mode 100644 index 0000000000000000000000000000000000000000..34c1bda7a1f61d11fda11ecb15e8fada67321bf2 --- /dev/null +++ b/integration/example.py @@ -0,0 +1,142 @@ +import paddle +import numpy as np +import math +import sys + + +class EMGDataset(paddle.io.Dataset): + def __init__(self, inputs, labels): + self.inputs = np.array(inputs, dtype='float32') # 确保是 float32 + self.labels = np.array(labels, dtype='int64') # 确保是 int64 + + def __getitem__(self, index): + return self.inputs[index], self.labels[index] + + def __len__(self): + return len(self.inputs) + + +class InitialEMGTraining: + def __init__(self, class_0_list, class_1_list, num_epochs=20, learning_rate=0.01, model_path="./my_paddle_model"): + self.class_0_list = class_0_list + self.class_1_list = class_1_list + self.num_epochs = num_epochs + self.learning_rate = learning_rate + self.model_path = model_path + self._prepare_data() + self._build_model() + + def _prepare_data(self): + # 合并数据并创建标签 + self.input_list = np.array(self.class_0_list + self.class_1_list, dtype='float32') # 确保是 float32 + self.label_list = [0] * len(self.class_0_list) + [1] * len(self.class_1_list) + + # 数据标准化 + mean = np.mean(self.input_list, axis=0) + std = np.std(self.input_list, axis=0) + self.input_list = (self.input_list - mean) / std + + # 创建数据集和数据加载器 + dataset = EMGDataset(self.input_list, self.label_list) + self.train_loader = paddle.io.DataLoader(dataset, batch_size=16, shuffle=True) + self.test_loader = paddle.io.DataLoader(dataset, batch_size=16, shuffle=False) + + def _build_model(self): + print("Network") + self.model = paddle.nn.Sequential( + paddle.nn.Linear(2, 10), + paddle.nn.ReLU(), + paddle.nn.Linear(10, 2) + ) + + self.criterion = paddle.nn.CrossEntropyLoss() + self.optimizer = paddle.optimizer.Adam(learning_rate=self.learning_rate, parameters=self.model.parameters()) + + def _train_epoch(self): + step = 0 + print("Start training") + for pass_id in range(self.num_epochs): + for features, labels in self.train_loader: + step += 1 + features = paddle.to_tensor(features, dtype='float32') # 确保是 float32 + labels = paddle.to_tensor(labels, dtype='int64') # 确保是 int64 + + # Forward pass + logits = self.model(features) + loss = self.criterion(logits, labels) + + # Backward pass and optimization + loss.backward() + self.optimizer.step() + self.optimizer.clear_grad() + + if step % 10 == 0: + print(f"Train cost, Step {step}, Loss {loss.numpy()}") + + if step % 10 == 0: + test_metrics = self._evaluate() + print(f"Test cost, Step {step}, Loss {test_metrics[0]}") + if test_metrics[0] < 0.1: + return + + if math.isnan(float(loss.numpy())): + raise ValueError("Got NaN loss, training failed.") + + def _evaluate(self): + total_loss = 0 + count = 0 + for features, labels in self.test_loader: + features = paddle.to_tensor(features, dtype='float32') # 确保是 float32 + labels = paddle.to_tensor(labels, dtype='int64') # 确保是 int64 + + logits = self.model(features) + loss = self.criterion(logits, labels) + total_loss += loss.numpy() + count += 1 + + return [total_loss / count] + + def train(self): + self._train_epoch() + # 保存训练后的模型 + if self.model_path: + paddle.save(self.model.state_dict(), self.model_path) + + def predict(self, new_data): + assert len(new_data[0]) == 2, "每个输入样本应包含两个特征。" + new_data = np.array(new_data, dtype='float32') # 确保是 float32 + # 数据标准化 + mean = np.mean(self.input_list, axis=0) + std = np.std(self.input_list, axis=0) + new_data = (new_data - mean) / std + + # 确保模型在评估模式下 + self.model.eval() + with paddle.no_grad(): + predictions = self.model(paddle.to_tensor(new_data, dtype='float32')) # 确保是 float32 + return np.argmax(predictions.numpy(), axis=1) + + +# 示例用法 +if __name__ == "__main__": + class_0_list = [[27, 18], [11, 8], [11, 7], [6, 5], [9, 4], [10, 3], [9, 4], [7, 4], [8, 3], [10, 5], [9, 3], + [7, 4], [9, 4], [7, 4], [7, 4], [9, 5], [12, 3], [9, 4], [7, 4], [10, 3], [10, 4], [9, 4], [10, 4], + [9, 3], [9, 4], [8, 4], [10, 3], [12, 4], [10, 4], [9, 3], [9, 5], [12, 4], [10, 4], [11, 4], + [11, 4], [7, 4], [9, 4], [8, 4], [7, 4], [10, 3], [9, 4], [17, 5], [9, 4], [10, 4], [8, 5], [13, 4], + [10, 5], [9, 4], [9, 4], [13, 3], [7, 4], [9, 5], [13, 4], [8, 4], [10, 3], [9, 3], [10, 4], + [10, 4], [12, 4], [7, 4], [9, 4], [7, 4], [11, 5], [7, 4], [10, 4], [8, 5], [8, 4], [6, 5], [9, 4], + [8, 3], [9, 3], [6, 5], [11, 4], [8, 4], [8, 4], [7, 4]] + class_1_list = [[0, 1449], [25, 20], [27, 22], [26, 19], [30, 20], [19, 33], [29, 48], [17, 17], [16, 27], [18, 19], + [20, 22], [18, 18], [18, 18], [0, 4], [24, 26], [30, 26], [19, 20], [21, 18], [21, 16], [17, 12], + [18, 27], [15, 9], [25, 20], [27, 22], [26, 19], [30, 20], [19, 33], [29, 48], [17, 17], [16, 27], + [18, 19], [20, 22], [18, 18], [18, 18], [24, 25], [25, 13], [25, 18], [11, 17], [18, 10], [22, 9], + [13, 15], [19, 11], [23, 14], [15, 17], [19, 12], [22, 16], [19, 10], [18, 12], [20, 16], [19, 11], + [17, 11], [18, 8], [20, 10], [14, 15], [22, 13], [27, 13], [22, 15], [24, 13], [18, 14], [18, 14], + [17, 12], [25, 17], [15, 17]] + + trainer = InitialEMGTraining(class_0_list, class_1_list) + trainer.train() + + new_samples = [[25, 20], [11, 8], [27, 22], [14, 15]] # 示例新样本 + predictions = trainer.predict(new_samples) + print("Predictions:", predictions) diff --git a/integration/try1.py b/integration/try1.py new file mode 100644 index 0000000000000000000000000000000000000000..c7527e8fdaeb3bfefa49874acbba2815826e317a --- /dev/null +++ b/integration/try1.py @@ -0,0 +1,64 @@ +def calculate_line_equation(list1, list2): + # 计算两个点的平均值 + def calculate_average(lst): + n = len(lst) + if n == 0: + return (0, 0) + sum_x = sum(point[0] for point in lst) + sum_y = sum(point[1] for point in lst) + return (sum_x / n, sum_y / n) + + avg1 = calculate_average(list1) + avg2 = calculate_average(list2) + + x1, y1 = avg1 + x2, y2 = avg2 + + # 计算斜率 + if x1 == x2: + raise ValueError("垂直线的斜率是未定义的,因为两个点在同一垂直线上。") + + slope = (y2 - y1) / (x2 - x1) + + # 垂直线的斜率是原斜率的负倒数 + perpendicular_slope = -1 / slope + + # 使用点斜式方程 y - y1 = m(x - x1) 转换为 y = ax + b 的形式 + a = perpendicular_slope + b = y1 - a * x1 + + return a, b + + +def test(point, a, b): + x, y = point + # 计算点的y值与垂直线的y值比较 + # 点 (x, y) 在垂直线 y = ax + b 的左侧还是右侧 + line_y = a * x + b + if y < line_y: + return -1 # 点在垂直线的左侧 + elif y > line_y: + return 1 # 点在垂直线的右侧 + else: + return 0 # 点在垂直线上(可选) + + +def main(): + # 示例输入 + list1 = [[1, 2], [2, 3], [3, 4]] + list2 = [[4, 5], [5, 6], [6, 7]] + + a, b = calculate_line_equation(list1, list2) + + print(f"垂直线方程: y = {a}x + {b}") + + # 测试点 + test_points = [[2, 5], [3, 3], [4, 1]] + for point in test_points: + position = test(point, a, b) + print(f"点 {point} 在垂直线的 {'左侧' if position == -1 else '右侧' if position == 1 else '上面/下面'}") + + +if __name__ == "__main__": + main() + #y = -0.5933988423971516x + 70.32174798709765