diff --git a/BLE/advertisement.py b/BLE/advertisement.py new file mode 100644 index 0000000000000000000000000000000000000000..86a31c545ba147b39580cb80e759f7b085512a1b --- /dev/null +++ b/BLE/advertisement.py @@ -0,0 +1,134 @@ +"""Copyright (c) 2019, Douglas Otwell + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import dbus +import dbus.service + +from bletools import BleTools + +BLUEZ_SERVICE_NAME = "org.bluez" +LE_ADVERTISING_MANAGER_IFACE = "org.bluez.LEAdvertisingManager1" +DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager" +DBUS_PROP_IFACE = "org.freedesktop.DBus.Properties" +LE_ADVERTISEMENT_IFACE = "org.bluez.LEAdvertisement1" + + +class Advertisement(dbus.service.Object): + PATH_BASE = "/org/bluez/example/advertisement" + + def __init__(self, index, advertising_type): + self.path = self.PATH_BASE + str(index) + self.bus = BleTools.get_bus() + self.ad_type = advertising_type + self.local_name = None + self.service_uuids = None + self.solicit_uuids = None + self.manufacturer_data = None + self.service_data = None + self.include_tx_power = None + dbus.service.Object.__init__(self, self.bus, self.path) + + def get_properties(self): + properties = dict() + properties["Type"] = self.ad_type + + if self.local_name is not None: + properties["LocalName"] = dbus.String(self.local_name) + + if self.service_uuids is not None: + properties["ServiceUUIDs"] = dbus.Array(self.service_uuids, + signature='s') + if self.solicit_uuids is not None: + properties["SolicitUUIDs"] = dbus.Array(self.solicit_uuids, + signature='s') + if self.manufacturer_data is not None: + properties["ManufacturerData"] = dbus.Dictionary( + self.manufacturer_data, signature='qv') + + if self.service_data is not None: + properties["ServiceData"] = dbus.Dictionary(self.service_data, + signature='sv') + if self.include_tx_power is not None: + properties["IncludeTxPower"] = dbus.Boolean(self.include_tx_power) + + if self.local_name is not None: + properties["LocalName"] = dbus.String(self.local_name) + + return {LE_ADVERTISEMENT_IFACE: properties} + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_service_uuid(self, uuid): + if not self.service_uuids: + self.service_uuids = [] + self.service_uuids.append(uuid) + + def add_solicit_uuid(self, uuid): + if not self.solicit_uuids: + self.solicit_uuids = [] + self.solicit_uuids.append(uuid) + + def add_manufacturer_data(self, manuf_code, data): + if not self.manufacturer_data: + self.manufacturer_data = dbus.Dictionary({}, signature="qv") + self.manufacturer_data[manuf_code] = dbus.Array(data, signature="y") + + def add_service_data(self, uuid, data): + if not self.service_data: + self.service_data = dbus.Dictionary({}, signature="sv") + self.service_data[uuid] = dbus.Array(data, signature="y") + + def add_local_name(self, name): + if not self.local_name: + self.local_name = "" + self.local_name = dbus.String(name) + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature="s", + out_signature="a{sv}") + def GetAll(self, interface): + if interface != LE_ADVERTISEMENT_IFACE: + raise InvalidArgsException() + + return self.get_properties()[LE_ADVERTISEMENT_IFACE] + + @dbus.service.method(LE_ADVERTISEMENT_IFACE, + in_signature='', + out_signature='') + def Release(self): + print ('%s: Released!' % self.path) + + def register_ad_callback(self): + print("GATT advertisement registered") + + def register_ad_error_callback(self): + print("Failed to register GATT advertisement") + + def register(self): + bus = BleTools.get_bus() + adapter = BleTools.find_adapter(bus) + + ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), + LE_ADVERTISING_MANAGER_IFACE) + ad_manager.RegisterAdvertisement(self.get_path(), {}, + reply_handler=self.register_ad_callback, + error_handler=self.register_ad_error_callback) diff --git a/BLE/service.py b/BLE/service.py new file mode 100644 index 0000000000000000000000000000000000000000..72ead6a9101c10aca50ab48a3d253f0b9596c9f4 --- /dev/null +++ b/BLE/service.py @@ -0,0 +1,315 @@ +"""Copyright (c) 2019, Douglas Otwell + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import dbus +import dbus.mainloop.glib +import dbus.exceptions +try: + from gi.repository import GObject +except ImportError: + import gobject as GObject +from bletools import BleTools + +BLUEZ_SERVICE_NAME = "org.bluez" +GATT_MANAGER_IFACE = "org.bluez.GattManager1" +DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager" +DBUS_PROP_IFACE = "org.freedesktop.DBus.Properties" +GATT_SERVICE_IFACE = "org.bluez.GattService1" +GATT_CHRC_IFACE = "org.bluez.GattCharacteristic1" +GATT_DESC_IFACE = "org.bluez.GattDescriptor1" + +class InvalidArgsException(dbus.exceptions.DBusException): + _dbus_error_name = "org.freedesktop.DBus.Error.InvalidArgs" + +class NotSupportedException(dbus.exceptions.DBusException): + _dbus_error_name = "org.bluez.Error.NotSupported" + +class NotPermittedException(dbus.exceptions.DBusException): + _dbus_error_name = "org.bluez.Error.NotPermitted" + +class Application(dbus.service.Object): + def __init__(self): + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + self.mainloop = GObject.MainLoop() + self.bus = BleTools.get_bus() + self.path = "/" + self.services = [] + self.next_index = 0 + dbus.service.Object.__init__(self, self.bus, self.path) + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_service(self, service): + self.services.append(service) + + @dbus.service.method(DBUS_OM_IFACE, out_signature = "a{oa{sa{sv}}}") + def GetManagedObjects(self): + response = {} + + for service in self.services: + response[service.get_path()] = service.get_properties() + chrcs = service.get_characteristics() + for chrc in chrcs: + response[chrc.get_path()] = chrc.get_properties() + descs = chrc.get_descriptors() + for desc in descs: + response[desc.get_path()] = desc.get_properties() + + return response + + def register_app_callback(self): + print("GATT application registered") + + def register_app_error_callback(self, error): + print("Failed to register application: " + str(error)) + + def register(self): + adapter = BleTools.find_adapter(self.bus) + + service_manager = dbus.Interface( + self.bus.get_object(BLUEZ_SERVICE_NAME, adapter), + GATT_MANAGER_IFACE) + + service_manager.RegisterApplication(self.get_path(), {}, + reply_handler=self.register_app_callback, + error_handler=self.register_app_error_callback) + + def run(self): + self.mainloop.run() + + def quit(self): + print("\nGATT application terminated") + self.mainloop.quit() + +class Service(dbus.service.Object): + PATH_BASE = "/org/bluez/example/service" + + def __init__(self, index, uuid, primary): + self.bus = BleTools.get_bus() + self.path = self.PATH_BASE + str(index) + self.uuid = uuid + self.primary = primary + self.characteristics = [] + self.next_index = 0 + dbus.service.Object.__init__(self, self.bus, self.path) + + def get_properties(self): + return { + GATT_SERVICE_IFACE: { + 'UUID': self.uuid, + 'Primary': self.primary, + 'Characteristics': dbus.Array( + self.get_characteristic_paths(), + signature='o') + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_characteristic(self, characteristic): + self.characteristics.append(characteristic) + + def get_characteristic_paths(self): + result = [] + for chrc in self.characteristics: + result.append(chrc.get_path()) + return result + + def get_characteristics(self): + return self.characteristics + + def get_bus(self): + return self.bus + + def get_next_index(self): + idx = self.next_index + self.next_index += 1 + + return idx + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_SERVICE_IFACE: + raise InvalidArgsException() + + return self.get_properties()[GATT_SERVICE_IFACE] + +class Characteristic(dbus.service.Object): + """ + org.bluez.GattCharacteristic1 interface implementation + """ + def __init__(self, uuid, flags, service): + index = service.get_next_index() + self.path = service.path + '/char' + str(index) + self.bus = service.get_bus() + self.uuid = uuid + self.service = service + self.flags = flags + self.descriptors = [] + self.next_index = 0 + dbus.service.Object.__init__(self, self.bus, self.path) + + def get_properties(self): + return { + GATT_CHRC_IFACE: { + 'Service': self.service.get_path(), + 'UUID': self.uuid, + 'Flags': self.flags, + 'Descriptors': dbus.Array( + self.get_descriptor_paths(), + signature='o') + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_descriptor(self, descriptor): + self.descriptors.append(descriptor) + + def get_descriptor_paths(self): + result = [] + for desc in self.descriptors: + result.append(desc.get_path()) + return result + + def get_descriptors(self): + return self.descriptors + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_CHRC_IFACE: + raise InvalidArgsException() + + return self.get_properties()[GATT_CHRC_IFACE] + + @dbus.service.method(GATT_CHRC_IFACE, + in_signature='a{sv}', + out_signature='ay') + def ReadValue(self, options): + print('Default ReadValue called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}') + def WriteValue(self, value, options): + print('Default WriteValue called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE) + def StartNotify(self): + print('Default StartNotify called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE) + def StopNotify(self): + print('Default StopNotify called, returning error') + raise NotSupportedException() + + @dbus.service.signal(DBUS_PROP_IFACE, + signature='sa{sv}as') + def PropertiesChanged(self, interface, changed, invalidated): + pass + + def get_bus(self): + bus = self.bus + + return bus + + def get_next_index(self): + idx = self.next_index + self.next_index += 1 + + return idx + + def add_timeout(self, timeout, callback): + GObject.timeout_add(timeout, callback) + + +class Descriptor(dbus.service.Object): + def __init__(self, uuid, flags, characteristic): + index = characteristic.get_next_index() + self.path = characteristic.path + '/desc' + str(index) + self.uuid = uuid + self.flags = flags + self.chrc = characteristic + self.bus = characteristic.get_bus() + dbus.service.Object.__init__(self, self.bus, self.path) + + def get_properties(self): + return { + GATT_DESC_IFACE: { + 'Characteristic': self.chrc.get_path(), + 'UUID': self.uuid, + 'Flags': self.flags, + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_DESC_IFACE: + raise InvalidArgsException() + + return self.get_properties()[GATT_DESC_IFACE] + + @dbus.service.method(GATT_DESC_IFACE, + in_signature='a{sv}', + out_signature='ay') + def ReadValue(self, options): + print ('Default ReadValue called, returning error') + raise NotSupportedException() + + @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}') + def WriteValue(self, value, options): + print('Default WriteValue called, returning error') + raise NotSupportedException() + + +class CharacteristicUserDescriptionDescriptor(Descriptor): + CUD_UUID = '2901' + + def __init__(self, bus, index, characteristic): + self.writable = 'writable-auxiliaries' in characteristic.flags + self.value = array.array('B', b'This is a characteristic for testing') + self.value = self.value.tolist() + Descriptor.__init__( + self, bus, index, + self.CUD_UUID, + ['read', 'write'], + characteristic) + + def ReadValue(self, options): + return self.value + + def WriteValue(self, value, options): + if not self.writable: + raise NotPermittedException() + self.value = value