diff --git a/BLE/BLE-PC.py b/BLE/BLE-PC.py new file mode 100644 index 0000000000000000000000000000000000000000..f0b7e570760c278b651b341b80fd351b45110472 --- /dev/null +++ b/BLE/BLE-PC.py @@ -0,0 +1,10 @@ +import asyncio +from bleak import BleakScanner + +async def main(): + devices = await BleakScanner.discover() + + for device in devices: + print(device) if device.name != None else None + +asyncio.run(main()) \ No newline at end of file diff --git a/BLE/BLE-RPi.py b/BLE/BLE-RPi.py new file mode 100644 index 0000000000000000000000000000000000000000..ee569d2324a0ee89de73b1af98fa9588a2a43484 --- /dev/null +++ b/BLE/BLE-RPi.py @@ -0,0 +1,31 @@ +import gatt + +class ChatPeripheral(gatt.DeviceManager): + def device_discovered(self, device): + print(f"Discovered [{device.mac_address}] {device.alias()}") + +class ChatService(gatt.Service): + def __init__(self, manager): + super().__init__(manager=manager, uuid="12345678-1234-5678-1234-56789abcdef0", primary=True) + +class ChatCharacteristic(gatt.Characteristic): + def __init__(self, service): + super().__init__( + uuid="12345678-1234-5678-1234-56789abcdef1", service=service, + flags=gatt.Characteristic.FLAG_READ | gatt.Characteristic.FLAG_WRITE + ) + self.message = "" + + def ReadValue(self, options): + print("Client is reading the value!") + return [dbus.Byte(c.encode()) for c in self.message] + + def WriteValue(self, value, options): + self.message = "".join([str(byte) for byte in value]) + print(f"Received message: {self.message}") + +manager = ChatPeripheral(adapter_name='hci0') +service = ChatService(manager) +chat_char = ChatCharacteristic(service) + +manager.run() diff --git a/BLE/BLESCannerGUI.py b/BLE/BLESCannerGUI.py new file mode 100644 index 0000000000000000000000000000000000000000..fc93dc3900fd58611a7edbf71ab1bb58126937e6 --- /dev/null +++ b/BLE/BLESCannerGUI.py @@ -0,0 +1,135 @@ +import asyncio, tkinter as tk, threading +from bleak import BleakScanner, BleakClient + +# UUID for the characteristic to interact withs +CHARACTERISTIC_UUID = '00000002-710e-4a5b-8d75-3e5b444bc3cf' +RUNNING = True + +class App(tk.Tk): + def __init__(self, *args, **kwargs): + width, height = kwargs.pop("width", 800), kwargs.pop("height", 500) + super().__init__(*args, **kwargs) + self.title("BLE app") + + self.geometry("{}x{}+{}+{}".format(width, height, (self.winfo_screenwidth() - width) // 2, (self.winfo_screenheight() - height) // 2 - 30)) + self.protocol("WM_DELETE_WINDOW", self.close) + self.tasks = [] + + self.rowconfigure((0, 1), weight = 1) + self.columnconfigure(0, weight = 1) + + listboxFrame = tk.Frame(self) + listboxFrame.grid(row = 0, column = 0, sticky = "nsew") + + self.titles = ["Devices", "Services", "Characteristics"] + self.listboxes = [] + listboxFrame.rowconfigure(1, weight = 1, uniform = "row") + listboxFrame.columnconfigure((0, 2, 4), weight = 1, uniform = "column") + for i in range(3): + tk.Label(listboxFrame, text = self.titles[i]).grid(row = 0, column = i * 2, columnspan = 2, sticky = "nsew") + listboxFrame.columnconfigure(i * 2, weight = 1) + lb = tk.Listbox(listboxFrame) + lb.grid(row = 1, column = i * 2, sticky = "nsew", rowspan = 3 if i else 1) + sb = tk.Scrollbar(listboxFrame) #scrollbar + sb.grid(row = 1, column = (i * 2) + 1, sticky = "nsew", rowspan = 3 if i else 1) + lb.config(yscrollcommand = sb.set) + sb.config(command = lb.yview) + self.listboxes.append(lb) + + self.listboxes[0].bind('<<ListboxSelect>>', self.deviceSelect) + + self.scanning = False + self.devices = [] + self.scanButton = tk.Button(listboxFrame, text = "Start scanning", command = self.pressedScanButton) + self.scanButton.grid(row = 2, column = 0, columnspan = 2, sticky = "nsew") + self.connectButton = tk.Button(listboxFrame, text = "connect", state = "disabled") + self.connectButton.grid(row = 3, column = 0, columnspan = 2, sticky = "nsew") + + self.bottomFrame = tk.Frame(self, bg = "navy") + self.bottomFrame.grid(row = 1, column = 0, sticky = "nsew") + + tk.Button(self.bottomFrame, text = "Useless button").grid(row = 0, column = 0, sticky = "nsew") + + def updateDevices(self, devices): + if self.scanning: + self.devices = devices + self.listboxes[0].delete(0, "end") + names = list(map(lambda device: device.name, devices)) + # print(names) + self.listboxes[0].insert(0, *names) + + def deviceSelect(self, event): + selection = event.widget.curselection() + if selection: + index = selection[0] + name = event.widget.get(index) + print(name) + self.connectButton.config(state = "normal") + + def pressedScanButton(self): + if not self.scanning: + self.scanning = True + self.scanButton.config(text = "Stop scanning") + else: + self.scanning = False + self.scanButton.config(text = "Start scanning") + + def close(self): + global RUNNING + RUNNING = False + self.destroy() + +root = App() + +async def scanDevices(root): + print("Scanning for BLE devices...") + devices = await BleakScanner.discover() + devices = list(filter(lambda device: device.name != None, devices)) + root.updateDevices(devices) + +async def connectToDevice(device): + print("Address: ", device.address) + async with BleakClient(device.address) as client: + services = client.services + + temp = services.get_characteristic(CHARACTERISTIC_UUID) + print(temp.properties) + print(temp.description) + +async def asyncMain(): + scanningTask = None + while RUNNING: + if root.scanning: + if scanningTask == None: + scanningTask = asyncio.create_task(scanDevices(root)) + await scanningTask + print("Created task") + else: + if scanningTask.done(): + scanningTask = None + else: + if scanningTask != None: + print("Cancel") + scanningTask.cancel() + scanningTask = None + else: + if scanningTask != None: + scanningTask.cancel() + + +# async def main(): +# # Discover BLE devices +# # device = await discover_devices() + +# # If a device is selected, connect and interact with it +# # if device: +# # await connect_and_interact(device) +# await connect_and_interact(None) + +# i = 0 + +if __name__ == "__main__": + thread = threading.Thread(target = lambda: asyncio.run(asyncMain())) + thread.start() + root.mainloop() + thread.join() \ No newline at end of file diff --git a/BLE/BLEScanner.py b/BLE/BLEScanner.py new file mode 100644 index 0000000000000000000000000000000000000000..8913d6b70c1cf9a465e1fbf9b649baa062f0f9e6 --- /dev/null +++ b/BLE/BLEScanner.py @@ -0,0 +1,72 @@ +import asyncio +from bleak import BleakScanner, BleakClient + +# UUID for the characteristic to interact with +SERVICE_UUID = '00000001-710e-4a5b-8d75-3e5b444bc3cf' +CHARACTERISTIC_UUID = '00000002-710e-4a5b-8d75-3e5b444bc3cf' + + +async def discover_devices(): + print("Scanning for BLE devices...") + devices = await BleakScanner.discover() + + if not devices: + print("No BLE devices found. Make sure the devices are in range and advertising.") + return None + + print("\nFound devices:") + devices = list(filter(lambda device: device.name != None, devices)) + for i, device in enumerate(devices): + print(f"{i}: {device.name} - {device.address}") + + # Ask the user to select a device to connect to + selected_device_index = int(input("\nSelect the device number you want to connect to: ")) + if 0 <= selected_device_index < len(devices): + return devices[selected_device_index] + else: + print("Invalid selection.") + return None + + +async def connect_and_interact(device): + # print("Address: ", device.address) + async with BleakClient("D8:3A:DD:D0:7C:61") as client: + # async with BleakClient(device.address) as client: + # print(f"\nConnected to {device.name} ({device.address})") + + # for key, item in client.services.services.items(): + # print("Services:", key, item) + # print(item.characteristics, item.description) + + # Check if the service and characteristic are available on the device + services = client.services + # print("Characteristics:") + # for key, item in services.characteristics.items(): + # print(key, "|", item) + + for key, item in services.characteristics.items(): + uuid, name, properties = item.uuid, item.description, item.properties + print(name, uuid, properties) + + temp = services.get_characteristic(CHARACTERISTIC_UUID) + print("Properties:", temp.properties) + print("Description:", temp.description) + + for i in range(1): + data = await client.read_gatt_char(CHARACTERISTIC_UUID) + print(data.decode()) + # await client.write_gatt_char("00000003-710e-4a5b-8d75-3e5b444bc3cf", input("C or F: ").encode()) + + + +async def main(): + # Discover BLE devices + # device = await discover_devices() + + # If a device is selected, connect and interact with it + # if device: + # await connect_and_interact(device) + await connect_and_interact(None) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/BLE/BLEScript.py b/BLE/BLEScript.py new file mode 100644 index 0000000000000000000000000000000000000000..ce481c76a8d961d566976ffd4847c712043fdf14 --- /dev/null +++ b/BLE/BLEScript.py @@ -0,0 +1,129 @@ +import dbus +from advertisement import Advertisement +from service import Application, Service, Characteristic, Descriptor + +# Constants +GATT_CHRC_IFACE = "org.bluez.GattCharacteristic1" +SSID_NAMES_CHARACTERISTIC_UUID = "00000002-710e-4a5b-8d75-3e5b444bc3cf" +SSID_INFO_CHARACTERISTIC_UUID = "00000003-710e-4a5b-8d75-3e5b444bc3cf" +NOTIFY_TIMEOUT = 5000 + +class SSIDAdvertisement(Advertisement): + def __init__(self, index): + Advertisement.__init__(self, index, "peripheral") + self.add_local_name("RoboBin-BT") + self.include_tx_power = True + +class SSIDService(Service): + SSID_SERVICE_UUID = "00000001-710e-4a5b-8d75-3e5b444bc3cf" + + def __init__(self, index): + Service.__init__(self, index, self.SSID_SERVICE_UUID, True) + self.ssid_names = ["HomeNetwork", "OfficeWiFi", "GuestWiFi"] # Example SSID list + self.selectedSSID = "" + self.password = "" + self.add_characteristic(SSIDListCharacteristic(self)) + self.add_characteristic(SSIDInfoCharacteristic(self)) + + def get_ssid_names(self): + """Returns the SSID list as a string with newline separation.""" + return "\n".join(self.ssid_names) + + def set_ssid_info(self, ssidInfo): + """Sets the SSID and password.""" + self.selectedSSID, self.password = ssidInfo.split(",") + +class SSIDListCharacteristic(Characteristic): + """A read-only characteristic that returns a list of SSIDs.""" + def __init__(self, service): + Characteristic.__init__( + self, SSID_NAMES_CHARACTERISTIC_UUID, ["read"], service) + self.add_descriptor(SSIDListDescriptor(self)) + + def ReadValue(self, options): + """Returns the list of SSIDs joined by newline characters.""" + value = [] + ssid_names_str = self.service.get_ssid_names() + + for c in ssid_names_str: + value.append(dbus.Byte(c.encode())) + + return value + +class SSIDListDescriptor(Descriptor): + TEMP_DESCRIPTOR_UUID = "2901" + TEMP_DESCRIPTOR_VALUE = "SSID List" + + def __init__(self, characteristic): + Descriptor.__init__(self, self.TEMP_DESCRIPTOR_UUID, ["read"], characteristic) + + def ReadValue(self, options): + value = [] + desc = self.TEMP_DESCRIPTOR_VALUE + for c in desc: + value.append(dbus.Byte(c.encode())) + return value + +class SSIDInfoCharacteristic(Characteristic): + """A read/write characteristic for sending an SSID and password.""" + def __init__(self, service): + Characteristic.__init__(self, SSID_INFO_CHARACTERISTIC_UUID, ["read", "write", "notify"], service) #Adding "notify" to enable notifications + self.add_descriptor(SSIDDescriptor(self)) + self.notifying = False #To track if a client is receiving notifications + + def ReadValue(self, options): + """Returns the last sent SSID and password (if any).""" + value = [] + ssid_info_str = (self.service.selectedSSID + ":" + self.service.password) or "No SSID info set" + + for c in ssid_info_str: + value.append(dbus.Byte(c.encode())) + + return value + + def WriteValue(self, value, options): + """Writes an SSID and password sent from the client.""" + ssid_info = "".join([chr(b) for b in value]) # Convert bytes back to string + self.service.set_ssid_info(ssid_info) + print(f"Received SSID and password: {ssid_info}") + + def StartNotify(self): + """Called when a client subscribes to notifications.""" + if self.notifying: + return + + self.notifying = True + # If needed, start sending notifications, e.g., a periodic update like the CPU temperature example + # self.add_timeout(NOTIFY_TIMEOUT, self.some_notify_callback) + + def StopNotify(self): + """Called when a client unsubscribes from notifications.""" + self.notifying = False + +class SSIDDescriptor(Descriptor): + """Descriptor providing additional information about the SSID characteristics.""" + SSID_DESCRIPTOR_UUID = "2901" + SSID_DESCRIPTOR_VALUE = "SSID name and password" + + def __init__(self, characteristic): + Descriptor.__init__(self, self.SSID_DESCRIPTOR_UUID, ["read"], characteristic) + + def ReadValue(self, options): + value = [] + desc = self.SSID_DESCRIPTOR_VALUE + for c in desc: + value.append(dbus.Byte(c.encode())) + return value + +# Main Application +app = Application() +app.add_service(SSIDService(0)) +app.register() + +adv = SSIDAdvertisement(0) +adv.register() + +try: + app.run() +except KeyboardInterrupt: + app.quit() diff --git a/BLE/advertisement.py b/BLE/advertisement.py new file mode 100644 index 0000000000000000000000000000000000000000..86a31c545ba147b39580cb80e759f7b085512a1b --- /dev/null +++ b/BLE/advertisement.py @@ -0,0 +1,134 @@ +"""Copyright (c) 2019, Douglas Otwell + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import dbus +import dbus.service + +from bletools import BleTools + +BLUEZ_SERVICE_NAME = "org.bluez" +LE_ADVERTISING_MANAGER_IFACE = "org.bluez.LEAdvertisingManager1" +DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager" +DBUS_PROP_IFACE = "org.freedesktop.DBus.Properties" +LE_ADVERTISEMENT_IFACE = "org.bluez.LEAdvertisement1" + + +class Advertisement(dbus.service.Object): + PATH_BASE = "/org/bluez/example/advertisement" + + def __init__(self, index, advertising_type): + self.path = self.PATH_BASE + str(index) + self.bus = BleTools.get_bus() + self.ad_type = advertising_type + self.local_name = None + self.service_uuids = None + self.solicit_uuids = None + self.manufacturer_data = None + self.service_data = None + self.include_tx_power = None + dbus.service.Object.__init__(self, self.bus, self.path) + + def get_properties(self): + properties = dict() + properties["Type"] = self.ad_type + + if self.local_name is not None: + properties["LocalName"] = dbus.String(self.local_name) + + if self.service_uuids is not None: + properties["ServiceUUIDs"] = dbus.Array(self.service_uuids, + signature='s') + if self.solicit_uuids is not None: + properties["SolicitUUIDs"] = dbus.Array(self.solicit_uuids, + signature='s') + if self.manufacturer_data is not None: + properties["ManufacturerData"] = dbus.Dictionary( + self.manufacturer_data, signature='qv') + + if self.service_data is not None: + properties["ServiceData"] = dbus.Dictionary(self.service_data, + signature='sv') + if self.include_tx_power is not None: + properties["IncludeTxPower"] = dbus.Boolean(self.include_tx_power) + + if self.local_name is not None: + properties["LocalName"] = dbus.String(self.local_name) + + return {LE_ADVERTISEMENT_IFACE: properties} + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_service_uuid(self, uuid): + if not self.service_uuids: + self.service_uuids = [] + self.service_uuids.append(uuid) + + def add_solicit_uuid(self, uuid): + if not self.solicit_uuids: + self.solicit_uuids = [] + self.solicit_uuids.append(uuid) + + def add_manufacturer_data(self, manuf_code, data): + if not self.manufacturer_data: + self.manufacturer_data = dbus.Dictionary({}, signature="qv") + self.manufacturer_data[manuf_code] = dbus.Array(data, signature="y") + + def add_service_data(self, uuid, data): + if not self.service_data: + self.service_data = dbus.Dictionary({}, signature="sv") + self.service_data[uuid] = dbus.Array(data, signature="y") + + def add_local_name(self, name): + if not self.local_name: + self.local_name = "" + self.local_name = dbus.String(name) + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature="s", + out_signature="a{sv}") + def GetAll(self, interface): + if interface != LE_ADVERTISEMENT_IFACE: + raise InvalidArgsException() + + return self.get_properties()[LE_ADVERTISEMENT_IFACE] + + @dbus.service.method(LE_ADVERTISEMENT_IFACE, + in_signature='', + out_signature='') + def Release(self): + print ('%s: Released!' % self.path) + + def register_ad_callback(self): + print("GATT advertisement registered") + + def register_ad_error_callback(self): + print("Failed to register GATT advertisement") + + def register(self): + bus = BleTools.get_bus() + adapter = BleTools.find_adapter(bus) + + ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), + LE_ADVERTISING_MANAGER_IFACE) + ad_manager.RegisterAdvertisement(self.get_path(), {}, + reply_handler=self.register_ad_callback, + error_handler=self.register_ad_error_callback) diff --git a/BLE/bletools.py b/BLE/bletools.py new file mode 100644 index 0000000000000000000000000000000000000000..33cc9f09dea9511faccb9f4d6aa37721e1433560 --- /dev/null +++ b/BLE/bletools.py @@ -0,0 +1,57 @@ +"""Copyright (c) 2019, Douglas Otwell + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import dbus +try: + from gi.repository import GObject +except ImportError: + import gobject as GObject + +BLUEZ_SERVICE_NAME = "org.bluez" +LE_ADVERTISING_MANAGER_IFACE = "org.bluez.LEAdvertisingManager1" +DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager" + +class BleTools(object): + @classmethod + def get_bus(self): + bus = dbus.SystemBus() + + return bus + + @classmethod + def find_adapter(self, bus): + remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, "/"), + DBUS_OM_IFACE) + objects = remote_om.GetManagedObjects() + + for o, props in objects.items(): + if LE_ADVERTISING_MANAGER_IFACE in props: + return o + + return None + + @classmethod + def power_adapter(self): + adapter = self.get_adapter() + + adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), + "org.freedesktop.DBus.Properties"); + adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) diff --git a/BLE/service.py b/BLE/service.py new file mode 100644 index 0000000000000000000000000000000000000000..72ead6a9101c10aca50ab48a3d253f0b9596c9f4 --- /dev/null +++ b/BLE/service.py @@ -0,0 +1,315 @@ +"""Copyright (c) 2019, Douglas Otwell + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import dbus +import dbus.mainloop.glib +import dbus.exceptions +try: + from gi.repository import GObject +except ImportError: + import gobject as GObject +from bletools import BleTools + +BLUEZ_SERVICE_NAME = "org.bluez" +GATT_MANAGER_IFACE = "org.bluez.GattManager1" +DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager" +DBUS_PROP_IFACE = "org.freedesktop.DBus.Properties" +GATT_SERVICE_IFACE = "org.bluez.GattService1" +GATT_CHRC_IFACE = "org.bluez.GattCharacteristic1" +GATT_DESC_IFACE = "org.bluez.GattDescriptor1" + +class InvalidArgsException(dbus.exceptions.DBusException): + _dbus_error_name = "org.freedesktop.DBus.Error.InvalidArgs" + +class NotSupportedException(dbus.exceptions.DBusException): + _dbus_error_name = "org.bluez.Error.NotSupported" + +class NotPermittedException(dbus.exceptions.DBusException): + _dbus_error_name = "org.bluez.Error.NotPermitted" + +class Application(dbus.service.Object): + def __init__(self): + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + self.mainloop = GObject.MainLoop() + self.bus = BleTools.get_bus() + self.path = "/" + self.services = [] + self.next_index = 0 + dbus.service.Object.__init__(self, self.bus, self.path) + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_service(self, service): + self.services.append(service) + + @dbus.service.method(DBUS_OM_IFACE, out_signature = "a{oa{sa{sv}}}") + def GetManagedObjects(self): + response = {} + + for service in self.services: + response[service.get_path()] = service.get_properties() + chrcs = service.get_characteristics() + for chrc in chrcs: + response[chrc.get_path()] = chrc.get_properties() + descs = chrc.get_descriptors() + for desc in descs: + response[desc.get_path()] = desc.get_properties() + + return response + + def register_app_callback(self): + print("GATT application registered") + + def register_app_error_callback(self, error): + print("Failed to register application: " + str(error)) + + def register(self): + adapter = BleTools.find_adapter(self.bus) + + service_manager = dbus.Interface( + self.bus.get_object(BLUEZ_SERVICE_NAME, adapter), + GATT_MANAGER_IFACE) + + service_manager.RegisterApplication(self.get_path(), {}, + reply_handler=self.register_app_callback, + error_handler=self.register_app_error_callback) + + def run(self): + self.mainloop.run() + + def quit(self): + print("\nGATT application terminated") + self.mainloop.quit() + +class Service(dbus.service.Object): + PATH_BASE = "/org/bluez/example/service" + + def __init__(self, index, uuid, primary): + self.bus = BleTools.get_bus() + self.path = self.PATH_BASE + str(index) + self.uuid = uuid + self.primary = primary + self.characteristics = [] + self.next_index = 0 + dbus.service.Object.__init__(self, self.bus, self.path) + + def get_properties(self): + return { + GATT_SERVICE_IFACE: { + 'UUID': self.uuid, + 'Primary': self.primary, + 'Characteristics': dbus.Array( + self.get_characteristic_paths(), + signature='o') + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_characteristic(self, characteristic): + self.characteristics.append(characteristic) + + def get_characteristic_paths(self): + result = [] + for chrc in self.characteristics: + result.append(chrc.get_path()) + return result + + def get_characteristics(self): + return self.characteristics + + def get_bus(self): + return self.bus + + def get_next_index(self): + idx = self.next_index + self.next_index += 1 + + return idx + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_SERVICE_IFACE: + raise InvalidArgsException() + + return self.get_properties()[GATT_SERVICE_IFACE] + +class Characteristic(dbus.service.Object): + """ + org.bluez.GattCharacteristic1 interface implementation + """ + def __init__(self, uuid, flags, service): + index = service.get_next_index() + self.path = service.path + '/char' + str(index) + self.bus = service.get_bus() + self.uuid = uuid + self.service = service + self.flags = flags + self.descriptors = [] + self.next_index = 0 + dbus.service.Object.__init__(self, self.bus, self.path) + + def get_properties(self): + return { + GATT_CHRC_IFACE: { + 'Service': self.service.get_path(), + 'UUID': self.uuid, + 'Flags': self.flags, + 'Descriptors': dbus.Array( + self.get_descriptor_paths(), + signature='o') + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_descriptor(self, descriptor): + self.descriptors.append(descriptor) + + def get_descriptor_paths(self): + result = [] + for desc in self.descriptors: + result.append(desc.get_path()) + return result + + def get_descriptors(self): + return self.descriptors + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_CHRC_IFACE: + raise InvalidArgsException() + + return self.get_properties()[GATT_CHRC_IFACE] + + @dbus.service.method(GATT_CHRC_IFACE, + in_signature='a{sv}', + out_signature='ay') + def ReadValue(self, options): + print('Default ReadValue called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}') + def WriteValue(self, value, options): + print('Default WriteValue called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE) + def StartNotify(self): + print('Default StartNotify called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE) + def StopNotify(self): + print('Default StopNotify called, returning error') + raise NotSupportedException() + + @dbus.service.signal(DBUS_PROP_IFACE, + signature='sa{sv}as') + def PropertiesChanged(self, interface, changed, invalidated): + pass + + def get_bus(self): + bus = self.bus + + return bus + + def get_next_index(self): + idx = self.next_index + self.next_index += 1 + + return idx + + def add_timeout(self, timeout, callback): + GObject.timeout_add(timeout, callback) + + +class Descriptor(dbus.service.Object): + def __init__(self, uuid, flags, characteristic): + index = characteristic.get_next_index() + self.path = characteristic.path + '/desc' + str(index) + self.uuid = uuid + self.flags = flags + self.chrc = characteristic + self.bus = characteristic.get_bus() + dbus.service.Object.__init__(self, self.bus, self.path) + + def get_properties(self): + return { + GATT_DESC_IFACE: { + 'Characteristic': self.chrc.get_path(), + 'UUID': self.uuid, + 'Flags': self.flags, + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_DESC_IFACE: + raise InvalidArgsException() + + return self.get_properties()[GATT_DESC_IFACE] + + @dbus.service.method(GATT_DESC_IFACE, + in_signature='a{sv}', + out_signature='ay') + def ReadValue(self, options): + print ('Default ReadValue called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}') + def WriteValue(self, value, options): + print('Default WriteValue called, returning error') + raise NotSupportedException() + + +class CharacteristicUserDescriptionDescriptor(Descriptor): + CUD_UUID = '2901' + + def __init__(self, bus, index, characteristic): + self.writable = 'writable-auxiliaries' in characteristic.flags + self.value = array.array('B', b'This is a characteristic for testing') + self.value = self.value.tolist() + Descriptor.__init__( + self, bus, index, + self.CUD_UUID, + ['read', 'write'], + characteristic) + + def ReadValue(self, options): + return self.value + + def WriteValue(self, value, options): + if not self.writable: + raise NotPermittedException() + self.value = value