From 3a26eede6831978c47f8eec6c223642ab8ff9eb2 Mon Sep 17 00:00:00 2001 From: jlKronos01 <joelleungyancheuk@gmail.com> Date: Fri, 8 Nov 2024 00:54:46 +0000 Subject: [PATCH] Update BLEScript.py --- BLE/BLEScript.py | 119 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 98 insertions(+), 21 deletions(-) diff --git a/BLE/BLEScript.py b/BLE/BLEScript.py index ce481c76..c4a263f0 100644 --- a/BLE/BLEScript.py +++ b/BLE/BLEScript.py @@ -5,8 +5,10 @@ from service import Application, Service, Characteristic, Descriptor # Constants GATT_CHRC_IFACE = "org.bluez.GattCharacteristic1" SSID_NAMES_CHARACTERISTIC_UUID = "00000002-710e-4a5b-8d75-3e5b444bc3cf" -SSID_INFO_CHARACTERISTIC_UUID = "00000003-710e-4a5b-8d75-3e5b444bc3cf" -NOTIFY_TIMEOUT = 5000 +STATUS_CHARACTERISTIC_UUID = "00000003-710e-4a5b-8d75-3e5b444bc3cf" +COMMAND_UUID = "00000004-710e-4a5b-8d75-3e5b444bc3cf" +SSID_INFO_CHARACTERISTIC_UUID = "00000005-710e-4a5b-8d75-3e5b444bc3cf" +NOTIFY_TIMEOUT = 1000 class SSIDAdvertisement(Advertisement): def __init__(self, index): @@ -19,15 +21,24 @@ class SSIDService(Service): def __init__(self, index): Service.__init__(self, index, self.SSID_SERVICE_UUID, True) - self.ssid_names = ["HomeNetwork", "OfficeWiFi", "GuestWiFi"] # Example SSID list self.selectedSSID = "" self.password = "" + self.add_characteristic(SSIDListCharacteristic(self)) + self.add_characteristic(StatusCharacteristic(self)) + self.add_characteristic(CommandCharacteristic(self)) self.add_characteristic(SSIDInfoCharacteristic(self)) def get_ssid_names(self): """Returns the SSID list as a string with newline separation.""" - return "\n".join(self.ssid_names) + return "\n".join(["HomeNetwork", "OfficeWiFi", "GuestWiFi"]) # Example SSID list + + def get_status(self): + """Returns the status of the Wi-Fi connection.""" + return "connected or smth" #placeholder + + def commandCallback(self, command): + print("Recieved command from inside command callback: {}".format(command)) def set_ssid_info(self, ssidInfo): """Sets the SSID and password.""" @@ -37,8 +48,9 @@ class SSIDListCharacteristic(Characteristic): """A read-only characteristic that returns a list of SSIDs.""" def __init__(self, service): Characteristic.__init__( - self, SSID_NAMES_CHARACTERISTIC_UUID, ["read"], service) + self, SSID_NAMES_CHARACTERISTIC_UUID, ["read", "notify"], service) self.add_descriptor(SSIDListDescriptor(self)) + self.notifying = False #To track if a client is receiving notifications def ReadValue(self, options): """Returns the list of SSIDs joined by newline characters.""" @@ -50,6 +62,20 @@ class SSIDListCharacteristic(Characteristic): return value + def StartNotify(self): + """Called when a client subscribes to notifications.""" + if self.notifying: + return + + self.notifying = True + # NEED TO FIGURE OUT HOW TO ENABLE NOTIFICATIONS SO SSID LIST CONSTANTLY UPDATES + # self.PropertiesChanged(GATT_CHRC_IFACE, {"Value": value}, []) #Probably need this line from cputemp.py? Not sure how it works though + # self.add_timeout(NOTIFY_TIMEOUT, self.some_notify_callback) #need to enable this, but not sure what the callback should do + + def StopNotify(self): + """Called when a client unsubscribes from notifications.""" + self.notifying = False + class SSIDListDescriptor(Descriptor): TEMP_DESCRIPTOR_UUID = "2901" TEMP_DESCRIPTOR_VALUE = "SSID List" @@ -64,21 +90,84 @@ class SSIDListDescriptor(Descriptor): value.append(dbus.Byte(c.encode())) return value +class StatusCharacteristic(Characteristic): + """A read-only characteristic that returns the status of the Wi-Fi connection.""" + def __init__(self, service): + Characteristic.__init__( + self, STATUS_CHARACTERISTIC_UUID, ["read"], service) + self.add_descriptor(StatusDescriptor(self)) + + def ReadValue(self, options): + """Returns the status of Wi-Fi connection.""" + value = [] + status = self.service.get_status() + for c in status: + value.append(dbus.Byte(c.encode())) + return value + +class StatusDescriptor(Descriptor): + TEMP_DESCRIPTOR_UUID = "2901" + TEMP_DESCRIPTOR_VALUE = "Status of Wi-Fi connection" + + def __init__(self, characteristic): + Descriptor.__init__(self, self.TEMP_DESCRIPTOR_UUID, ["read"], characteristic) + + def ReadValue(self, options): + value = [] + desc = self.TEMP_DESCRIPTOR_VALUE + for c in desc: + value.append(dbus.Byte(c.encode())) + return value + +class CommandCharacteristic(Characteristic): + """A write characteristic for reading commands.""" + def __init__(self, service): + Characteristic.__init__(self, COMMAND_UUID, ["write"], service) + self.add_descriptor(CommandDescriptor(self)) + + def ReadValue(self, options): + """Returns the last sent SSID and password (if any).""" + value = [] + ssid_info_str = (self.service.selectedSSID + ":" + self.service.password) or "No SSID info set" + + for c in ssid_info_str: + value.append(dbus.Byte(c.encode())) + + return value + + def WriteValue(self, value, options): + """Recieves command from client.""" + command = "".join([chr(b) for b in value]) # Convert bytes back to string + self.service.commandCallback(command) + print(f"Received command: {command}") + +class CommandDescriptor(Descriptor): + """Descriptor providing additional information about the command characteristics.""" + SSID_DESCRIPTOR_UUID = "2901" + SSID_DESCRIPTOR_VALUE = "Command characteristic" + + def __init__(self, characteristic): + Descriptor.__init__(self, self.SSID_DESCRIPTOR_UUID, ["read"], characteristic) + + def ReadValue(self, options): + value = [] + desc = self.SSID_DESCRIPTOR_VALUE + for c in desc: + value.append(dbus.Byte(c.encode())) + return value + class SSIDInfoCharacteristic(Characteristic): """A read/write characteristic for sending an SSID and password.""" def __init__(self, service): - Characteristic.__init__(self, SSID_INFO_CHARACTERISTIC_UUID, ["read", "write", "notify"], service) #Adding "notify" to enable notifications + Characteristic.__init__(self, SSID_INFO_CHARACTERISTIC_UUID, ["read", "write"], service) self.add_descriptor(SSIDDescriptor(self)) - self.notifying = False #To track if a client is receiving notifications def ReadValue(self, options): """Returns the last sent SSID and password (if any).""" value = [] ssid_info_str = (self.service.selectedSSID + ":" + self.service.password) or "No SSID info set" - for c in ssid_info_str: value.append(dbus.Byte(c.encode())) - return value def WriteValue(self, value, options): @@ -87,18 +176,6 @@ class SSIDInfoCharacteristic(Characteristic): self.service.set_ssid_info(ssid_info) print(f"Received SSID and password: {ssid_info}") - def StartNotify(self): - """Called when a client subscribes to notifications.""" - if self.notifying: - return - - self.notifying = True - # If needed, start sending notifications, e.g., a periodic update like the CPU temperature example - # self.add_timeout(NOTIFY_TIMEOUT, self.some_notify_callback) - - def StopNotify(self): - """Called when a client unsubscribes from notifications.""" - self.notifying = False class SSIDDescriptor(Descriptor): """Descriptor providing additional information about the SSID characteristics.""" -- GitLab