diff --git a/BLE/BLE-PC.py b/BLE/BLE-PC.py new file mode 100644 index 0000000000000000000000000000000000000000..f0b7e570760c278b651b341b80fd351b45110472 --- /dev/null +++ b/BLE/BLE-PC.py @@ -0,0 +1,10 @@ +import asyncio +from bleak import BleakScanner + +async def main(): + devices = await BleakScanner.discover() + + for device in devices: + print(device) if device.name != None else None + +asyncio.run(main()) \ No newline at end of file diff --git a/BLE/BLE-RPi.py b/BLE/BLE-RPi.py new file mode 100644 index 0000000000000000000000000000000000000000..ee569d2324a0ee89de73b1af98fa9588a2a43484 --- /dev/null +++ b/BLE/BLE-RPi.py @@ -0,0 +1,31 @@ +import gatt + +class ChatPeripheral(gatt.DeviceManager): + def device_discovered(self, device): + print(f"Discovered [{device.mac_address}] {device.alias()}") + +class ChatService(gatt.Service): + def __init__(self, manager): + super().__init__(manager=manager, uuid="12345678-1234-5678-1234-56789abcdef0", primary=True) + +class ChatCharacteristic(gatt.Characteristic): + def __init__(self, service): + super().__init__( + uuid="12345678-1234-5678-1234-56789abcdef1", service=service, + flags=gatt.Characteristic.FLAG_READ | gatt.Characteristic.FLAG_WRITE + ) + self.message = "" + + def ReadValue(self, options): + print("Client is reading the value!") + return [dbus.Byte(c.encode()) for c in self.message] + + def WriteValue(self, value, options): + self.message = "".join([str(byte) for byte in value]) + print(f"Received message: {self.message}") + +manager = ChatPeripheral(adapter_name='hci0') +service = ChatService(manager) +chat_char = ChatCharacteristic(service) + +manager.run() diff --git a/BLE/BLESCannerGUI.py b/BLE/BLESCannerGUI.py new file mode 100644 index 0000000000000000000000000000000000000000..fc93dc3900fd58611a7edbf71ab1bb58126937e6 --- /dev/null +++ b/BLE/BLESCannerGUI.py @@ -0,0 +1,135 @@ +import asyncio, tkinter as tk, threading +from bleak import BleakScanner, BleakClient + +# UUID for the characteristic to interact withs +CHARACTERISTIC_UUID = '00000002-710e-4a5b-8d75-3e5b444bc3cf' +RUNNING = True + +class App(tk.Tk): + def __init__(self, *args, **kwargs): + width, height = kwargs.pop("width", 800), kwargs.pop("height", 500) + super().__init__(*args, **kwargs) + self.title("BLE app") + + self.geometry("{}x{}+{}+{}".format(width, height, (self.winfo_screenwidth() - width) // 2, (self.winfo_screenheight() - height) // 2 - 30)) + self.protocol("WM_DELETE_WINDOW", self.close) + self.tasks = [] + + self.rowconfigure((0, 1), weight = 1) + self.columnconfigure(0, weight = 1) + + listboxFrame = tk.Frame(self) + listboxFrame.grid(row = 0, column = 0, sticky = "nsew") + + self.titles = ["Devices", "Services", "Characteristics"] + self.listboxes = [] + listboxFrame.rowconfigure(1, weight = 1, uniform = "row") + listboxFrame.columnconfigure((0, 2, 4), weight = 1, uniform = "column") + for i in range(3): + tk.Label(listboxFrame, text = self.titles[i]).grid(row = 0, column = i * 2, columnspan = 2, sticky = "nsew") + listboxFrame.columnconfigure(i * 2, weight = 1) + lb = tk.Listbox(listboxFrame) + lb.grid(row = 1, column = i * 2, sticky = "nsew", rowspan = 3 if i else 1) + sb = tk.Scrollbar(listboxFrame) #scrollbar + sb.grid(row = 1, column = (i * 2) + 1, sticky = "nsew", rowspan = 3 if i else 1) + lb.config(yscrollcommand = sb.set) + sb.config(command = lb.yview) + self.listboxes.append(lb) + + self.listboxes[0].bind('<<ListboxSelect>>', self.deviceSelect) + + self.scanning = False + self.devices = [] + self.scanButton = tk.Button(listboxFrame, text = "Start scanning", command = self.pressedScanButton) + self.scanButton.grid(row = 2, column = 0, columnspan = 2, sticky = "nsew") + self.connectButton = tk.Button(listboxFrame, text = "connect", state = "disabled") + self.connectButton.grid(row = 3, column = 0, columnspan = 2, sticky = "nsew") + + self.bottomFrame = tk.Frame(self, bg = "navy") + self.bottomFrame.grid(row = 1, column = 0, sticky = "nsew") + + tk.Button(self.bottomFrame, text = "Useless button").grid(row = 0, column = 0, sticky = "nsew") + + def updateDevices(self, devices): + if self.scanning: + self.devices = devices + self.listboxes[0].delete(0, "end") + names = list(map(lambda device: device.name, devices)) + # print(names) + self.listboxes[0].insert(0, *names) + + def deviceSelect(self, event): + selection = event.widget.curselection() + if selection: + index = selection[0] + name = event.widget.get(index) + print(name) + self.connectButton.config(state = "normal") + + def pressedScanButton(self): + if not self.scanning: + self.scanning = True + self.scanButton.config(text = "Stop scanning") + else: + self.scanning = False + self.scanButton.config(text = "Start scanning") + + def close(self): + global RUNNING + RUNNING = False + self.destroy() + +root = App() + +async def scanDevices(root): + print("Scanning for BLE devices...") + devices = await BleakScanner.discover() + devices = list(filter(lambda device: device.name != None, devices)) + root.updateDevices(devices) + +async def connectToDevice(device): + print("Address: ", device.address) + async with BleakClient(device.address) as client: + services = client.services + + temp = services.get_characteristic(CHARACTERISTIC_UUID) + print(temp.properties) + print(temp.description) + +async def asyncMain(): + scanningTask = None + while RUNNING: + if root.scanning: + if scanningTask == None: + scanningTask = asyncio.create_task(scanDevices(root)) + await scanningTask + print("Created task") + else: + if scanningTask.done(): + scanningTask = None + else: + if scanningTask != None: + print("Cancel") + scanningTask.cancel() + scanningTask = None + else: + if scanningTask != None: + scanningTask.cancel() + + +# async def main(): +# # Discover BLE devices +# # device = await discover_devices() + +# # If a device is selected, connect and interact with it +# # if device: +# # await connect_and_interact(device) +# await connect_and_interact(None) + +# i = 0 + +if __name__ == "__main__": + thread = threading.Thread(target = lambda: asyncio.run(asyncMain())) + thread.start() + root.mainloop() + thread.join() \ No newline at end of file diff --git a/BLE/BLEScanner.py b/BLE/BLEScanner.py new file mode 100644 index 0000000000000000000000000000000000000000..8913d6b70c1cf9a465e1fbf9b649baa062f0f9e6 --- /dev/null +++ b/BLE/BLEScanner.py @@ -0,0 +1,72 @@ +import asyncio +from bleak import BleakScanner, BleakClient + +# UUID for the characteristic to interact with +SERVICE_UUID = '00000001-710e-4a5b-8d75-3e5b444bc3cf' +CHARACTERISTIC_UUID = '00000002-710e-4a5b-8d75-3e5b444bc3cf' + + +async def discover_devices(): + print("Scanning for BLE devices...") + devices = await BleakScanner.discover() + + if not devices: + print("No BLE devices found. Make sure the devices are in range and advertising.") + return None + + print("\nFound devices:") + devices = list(filter(lambda device: device.name != None, devices)) + for i, device in enumerate(devices): + print(f"{i}: {device.name} - {device.address}") + + # Ask the user to select a device to connect to + selected_device_index = int(input("\nSelect the device number you want to connect to: ")) + if 0 <= selected_device_index < len(devices): + return devices[selected_device_index] + else: + print("Invalid selection.") + return None + + +async def connect_and_interact(device): + # print("Address: ", device.address) + async with BleakClient("D8:3A:DD:D0:7C:61") as client: + # async with BleakClient(device.address) as client: + # print(f"\nConnected to {device.name} ({device.address})") + + # for key, item in client.services.services.items(): + # print("Services:", key, item) + # print(item.characteristics, item.description) + + # Check if the service and characteristic are available on the device + services = client.services + # print("Characteristics:") + # for key, item in services.characteristics.items(): + # print(key, "|", item) + + for key, item in services.characteristics.items(): + uuid, name, properties = item.uuid, item.description, item.properties + print(name, uuid, properties) + + temp = services.get_characteristic(CHARACTERISTIC_UUID) + print("Properties:", temp.properties) + print("Description:", temp.description) + + for i in range(1): + data = await client.read_gatt_char(CHARACTERISTIC_UUID) + print(data.decode()) + # await client.write_gatt_char("00000003-710e-4a5b-8d75-3e5b444bc3cf", input("C or F: ").encode()) + + + +async def main(): + # Discover BLE devices + # device = await discover_devices() + + # If a device is selected, connect and interact with it + # if device: + # await connect_and_interact(device) + await connect_and_interact(None) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/BLE/BLEScript.py b/BLE/BLEScript.py new file mode 100644 index 0000000000000000000000000000000000000000..ce481c76a8d961d566976ffd4847c712043fdf14 --- /dev/null +++ b/BLE/BLEScript.py @@ -0,0 +1,129 @@ +import dbus +from advertisement import Advertisement +from service import Application, Service, Characteristic, Descriptor + +# Constants +GATT_CHRC_IFACE = "org.bluez.GattCharacteristic1" +SSID_NAMES_CHARACTERISTIC_UUID = "00000002-710e-4a5b-8d75-3e5b444bc3cf" +SSID_INFO_CHARACTERISTIC_UUID = "00000003-710e-4a5b-8d75-3e5b444bc3cf" +NOTIFY_TIMEOUT = 5000 + +class SSIDAdvertisement(Advertisement): + def __init__(self, index): + Advertisement.__init__(self, index, "peripheral") + self.add_local_name("RoboBin-BT") + self.include_tx_power = True + +class SSIDService(Service): + SSID_SERVICE_UUID = "00000001-710e-4a5b-8d75-3e5b444bc3cf" + + def __init__(self, index): + Service.__init__(self, index, self.SSID_SERVICE_UUID, True) + self.ssid_names = ["HomeNetwork", "OfficeWiFi", "GuestWiFi"] # Example SSID list + self.selectedSSID = "" + self.password = "" + self.add_characteristic(SSIDListCharacteristic(self)) + self.add_characteristic(SSIDInfoCharacteristic(self)) + + def get_ssid_names(self): + """Returns the SSID list as a string with newline separation.""" + return "\n".join(self.ssid_names) + + def set_ssid_info(self, ssidInfo): + """Sets the SSID and password.""" + self.selectedSSID, self.password = ssidInfo.split(",") + +class SSIDListCharacteristic(Characteristic): + """A read-only characteristic that returns a list of SSIDs.""" + def __init__(self, service): + Characteristic.__init__( + self, SSID_NAMES_CHARACTERISTIC_UUID, ["read"], service) + self.add_descriptor(SSIDListDescriptor(self)) + + def ReadValue(self, options): + """Returns the list of SSIDs joined by newline characters.""" + value = [] + ssid_names_str = self.service.get_ssid_names() + + for c in ssid_names_str: + value.append(dbus.Byte(c.encode())) + + return value + +class SSIDListDescriptor(Descriptor): + TEMP_DESCRIPTOR_UUID = "2901" + TEMP_DESCRIPTOR_VALUE = "SSID List" + + def __init__(self, characteristic): + Descriptor.__init__(self, self.TEMP_DESCRIPTOR_UUID, ["read"], characteristic) + + def ReadValue(self, options): + value = [] + desc = self.TEMP_DESCRIPTOR_VALUE + for c in desc: + value.append(dbus.Byte(c.encode())) + return value + +class SSIDInfoCharacteristic(Characteristic): + """A read/write characteristic for sending an SSID and password.""" + def __init__(self, service): + Characteristic.__init__(self, SSID_INFO_CHARACTERISTIC_UUID, ["read", "write", "notify"], service) #Adding "notify" to enable notifications + self.add_descriptor(SSIDDescriptor(self)) + self.notifying = False #To track if a client is receiving notifications + + def ReadValue(self, options): + """Returns the last sent SSID and password (if any).""" + value = [] + ssid_info_str = (self.service.selectedSSID + ":" + self.service.password) or "No SSID info set" + + for c in ssid_info_str: + value.append(dbus.Byte(c.encode())) + + return value + + def WriteValue(self, value, options): + """Writes an SSID and password sent from the client.""" + ssid_info = "".join([chr(b) for b in value]) # Convert bytes back to string + self.service.set_ssid_info(ssid_info) + print(f"Received SSID and password: {ssid_info}") + + def StartNotify(self): + """Called when a client subscribes to notifications.""" + if self.notifying: + return + + self.notifying = True + # If needed, start sending notifications, e.g., a periodic update like the CPU temperature example + # self.add_timeout(NOTIFY_TIMEOUT, self.some_notify_callback) + + def StopNotify(self): + """Called when a client unsubscribes from notifications.""" + self.notifying = False + +class SSIDDescriptor(Descriptor): + """Descriptor providing additional information about the SSID characteristics.""" + SSID_DESCRIPTOR_UUID = "2901" + SSID_DESCRIPTOR_VALUE = "SSID name and password" + + def __init__(self, characteristic): + Descriptor.__init__(self, self.SSID_DESCRIPTOR_UUID, ["read"], characteristic) + + def ReadValue(self, options): + value = [] + desc = self.SSID_DESCRIPTOR_VALUE + for c in desc: + value.append(dbus.Byte(c.encode())) + return value + +# Main Application +app = Application() +app.add_service(SSIDService(0)) +app.register() + +adv = SSIDAdvertisement(0) +adv.register() + +try: + app.run() +except KeyboardInterrupt: + app.quit() diff --git a/BLE/bletools.py b/BLE/bletools.py new file mode 100644 index 0000000000000000000000000000000000000000..33cc9f09dea9511faccb9f4d6aa37721e1433560 --- /dev/null +++ b/BLE/bletools.py @@ -0,0 +1,57 @@ +"""Copyright (c) 2019, Douglas Otwell + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import dbus +try: + from gi.repository import GObject +except ImportError: + import gobject as GObject + +BLUEZ_SERVICE_NAME = "org.bluez" +LE_ADVERTISING_MANAGER_IFACE = "org.bluez.LEAdvertisingManager1" +DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager" + +class BleTools(object): + @classmethod + def get_bus(self): + bus = dbus.SystemBus() + + return bus + + @classmethod + def find_adapter(self, bus): + remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, "/"), + DBUS_OM_IFACE) + objects = remote_om.GetManagedObjects() + + for o, props in objects.items(): + if LE_ADVERTISING_MANAGER_IFACE in props: + return o + + return None + + @classmethod + def power_adapter(self): + adapter = self.get_adapter() + + adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), + "org.freedesktop.DBus.Properties"); + adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))