From 967b9afa795bc597092c25d6faa2668fb2f8d73d Mon Sep 17 00:00:00 2001
From: Paul-Winpenny <92634321+Paul-Winpenny@users.noreply.github.com>
Date: Thu, 7 Nov 2024 16:18:35 +0000
Subject: [PATCH] added the other python files required for the BLE RPi

---
 BLE/advertisement.py | 134 ++++++++++++++++++
 BLE/service.py       | 315 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 449 insertions(+)
 create mode 100644 BLE/advertisement.py
 create mode 100644 BLE/service.py

diff --git a/BLE/advertisement.py b/BLE/advertisement.py
new file mode 100644
index 00000000..86a31c54
--- /dev/null
+++ b/BLE/advertisement.py
@@ -0,0 +1,134 @@
+"""Copyright (c) 2019, Douglas Otwell
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+import dbus
+import dbus.service
+
+from bletools import BleTools
+
+BLUEZ_SERVICE_NAME = "org.bluez"
+LE_ADVERTISING_MANAGER_IFACE = "org.bluez.LEAdvertisingManager1"
+DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
+DBUS_PROP_IFACE = "org.freedesktop.DBus.Properties"
+LE_ADVERTISEMENT_IFACE = "org.bluez.LEAdvertisement1"
+
+
+class Advertisement(dbus.service.Object):
+    PATH_BASE = "/org/bluez/example/advertisement"
+
+    def __init__(self, index, advertising_type):
+        self.path = self.PATH_BASE + str(index)
+        self.bus = BleTools.get_bus()
+        self.ad_type = advertising_type
+        self.local_name = None
+        self.service_uuids = None
+        self.solicit_uuids = None
+        self.manufacturer_data = None
+        self.service_data = None
+        self.include_tx_power = None
+        dbus.service.Object.__init__(self, self.bus, self.path)
+
+    def get_properties(self):
+        properties = dict()
+        properties["Type"] = self.ad_type
+
+        if self.local_name is not None:
+            properties["LocalName"] = dbus.String(self.local_name)
+
+        if self.service_uuids is not None:
+            properties["ServiceUUIDs"] = dbus.Array(self.service_uuids,
+                                                    signature='s')
+        if self.solicit_uuids is not None:
+            properties["SolicitUUIDs"] = dbus.Array(self.solicit_uuids,
+                                                    signature='s')
+        if self.manufacturer_data is not None:
+            properties["ManufacturerData"] = dbus.Dictionary(
+                self.manufacturer_data, signature='qv')
+
+        if self.service_data is not None:
+            properties["ServiceData"] = dbus.Dictionary(self.service_data,
+                                                        signature='sv')
+        if self.include_tx_power is not None:
+            properties["IncludeTxPower"] = dbus.Boolean(self.include_tx_power)
+
+        if self.local_name is not None:
+            properties["LocalName"] = dbus.String(self.local_name)
+
+        return {LE_ADVERTISEMENT_IFACE: properties}
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    def add_service_uuid(self, uuid):
+        if not self.service_uuids:
+            self.service_uuids = []
+        self.service_uuids.append(uuid)
+
+    def add_solicit_uuid(self, uuid):
+        if not self.solicit_uuids:
+            self.solicit_uuids = []
+        self.solicit_uuids.append(uuid)
+
+    def add_manufacturer_data(self, manuf_code, data):
+        if not self.manufacturer_data:
+            self.manufacturer_data = dbus.Dictionary({}, signature="qv")
+        self.manufacturer_data[manuf_code] = dbus.Array(data, signature="y")
+
+    def add_service_data(self, uuid, data):
+        if not self.service_data:
+            self.service_data = dbus.Dictionary({}, signature="sv")
+        self.service_data[uuid] = dbus.Array(data, signature="y")
+
+    def add_local_name(self, name):
+        if not self.local_name:
+            self.local_name = ""
+        self.local_name = dbus.String(name)
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature="s",
+                         out_signature="a{sv}")
+    def GetAll(self, interface):
+        if interface != LE_ADVERTISEMENT_IFACE:
+            raise InvalidArgsException()
+
+        return self.get_properties()[LE_ADVERTISEMENT_IFACE]
+
+    @dbus.service.method(LE_ADVERTISEMENT_IFACE,
+                         in_signature='',
+                         out_signature='')
+    def Release(self):
+        print ('%s: Released!' % self.path)
+
+    def register_ad_callback(self):
+        print("GATT advertisement registered")
+
+    def register_ad_error_callback(self):
+        print("Failed to register GATT advertisement")
+
+    def register(self):
+        bus = BleTools.get_bus()
+        adapter = BleTools.find_adapter(bus)
+
+        ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
+                                LE_ADVERTISING_MANAGER_IFACE)
+        ad_manager.RegisterAdvertisement(self.get_path(), {},
+                                     reply_handler=self.register_ad_callback,
+                                     error_handler=self.register_ad_error_callback)
diff --git a/BLE/service.py b/BLE/service.py
new file mode 100644
index 00000000..72ead6a9
--- /dev/null
+++ b/BLE/service.py
@@ -0,0 +1,315 @@
+"""Copyright (c) 2019, Douglas Otwell
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+import dbus
+import dbus.mainloop.glib
+import dbus.exceptions
+try:
+  from gi.repository import GObject
+except ImportError:
+    import gobject as GObject
+from bletools import BleTools
+
+BLUEZ_SERVICE_NAME = "org.bluez"
+GATT_MANAGER_IFACE = "org.bluez.GattManager1"
+DBUS_OM_IFACE =      "org.freedesktop.DBus.ObjectManager"
+DBUS_PROP_IFACE =    "org.freedesktop.DBus.Properties"
+GATT_SERVICE_IFACE = "org.bluez.GattService1"
+GATT_CHRC_IFACE =    "org.bluez.GattCharacteristic1"
+GATT_DESC_IFACE =    "org.bluez.GattDescriptor1"
+
+class InvalidArgsException(dbus.exceptions.DBusException):
+    _dbus_error_name = "org.freedesktop.DBus.Error.InvalidArgs"
+
+class NotSupportedException(dbus.exceptions.DBusException):
+    _dbus_error_name = "org.bluez.Error.NotSupported"
+
+class NotPermittedException(dbus.exceptions.DBusException):
+    _dbus_error_name = "org.bluez.Error.NotPermitted"
+
+class Application(dbus.service.Object):
+    def __init__(self):
+        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+        self.mainloop = GObject.MainLoop()
+        self.bus = BleTools.get_bus()
+        self.path = "/"
+        self.services = []
+        self.next_index = 0
+        dbus.service.Object.__init__(self, self.bus, self.path)
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    def add_service(self, service):
+        self.services.append(service)
+
+    @dbus.service.method(DBUS_OM_IFACE, out_signature = "a{oa{sa{sv}}}")
+    def GetManagedObjects(self):
+        response = {}
+
+        for service in self.services:
+            response[service.get_path()] = service.get_properties()
+            chrcs = service.get_characteristics()
+            for chrc in chrcs:
+                response[chrc.get_path()] = chrc.get_properties()
+                descs = chrc.get_descriptors()
+                for desc in descs:
+                    response[desc.get_path()] = desc.get_properties()
+
+        return response
+
+    def register_app_callback(self):
+        print("GATT application registered")
+
+    def register_app_error_callback(self, error):
+        print("Failed to register application: " + str(error))
+
+    def register(self):
+        adapter = BleTools.find_adapter(self.bus)
+
+        service_manager = dbus.Interface(
+                self.bus.get_object(BLUEZ_SERVICE_NAME, adapter),
+                GATT_MANAGER_IFACE)
+
+        service_manager.RegisterApplication(self.get_path(), {},
+                reply_handler=self.register_app_callback,
+                error_handler=self.register_app_error_callback)
+
+    def run(self):
+        self.mainloop.run()
+
+    def quit(self):
+        print("\nGATT application terminated")
+        self.mainloop.quit()
+
+class Service(dbus.service.Object):
+    PATH_BASE = "/org/bluez/example/service"
+
+    def __init__(self, index, uuid, primary):
+        self.bus = BleTools.get_bus()
+        self.path = self.PATH_BASE + str(index)
+        self.uuid = uuid
+        self.primary = primary
+        self.characteristics = []
+        self.next_index = 0
+        dbus.service.Object.__init__(self, self.bus, self.path)
+
+    def get_properties(self):
+        return {
+                GATT_SERVICE_IFACE: {
+                        'UUID': self.uuid,
+                        'Primary': self.primary,
+                        'Characteristics': dbus.Array(
+                                self.get_characteristic_paths(),
+                                signature='o')
+                }
+        }
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    def add_characteristic(self, characteristic):
+        self.characteristics.append(characteristic)
+
+    def get_characteristic_paths(self):
+        result = []
+        for chrc in self.characteristics:
+            result.append(chrc.get_path())
+        return result
+
+    def get_characteristics(self):
+        return self.characteristics
+
+    def get_bus(self):
+        return self.bus
+
+    def get_next_index(self):
+        idx = self.next_index
+        self.next_index += 1
+
+        return idx
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        if interface != GATT_SERVICE_IFACE:
+            raise InvalidArgsException()
+
+        return self.get_properties()[GATT_SERVICE_IFACE]
+
+class Characteristic(dbus.service.Object):
+    """
+    org.bluez.GattCharacteristic1 interface implementation
+    """
+    def __init__(self, uuid, flags, service):
+        index = service.get_next_index()
+        self.path = service.path + '/char' + str(index)
+        self.bus = service.get_bus()
+        self.uuid = uuid
+        self.service = service
+        self.flags = flags
+        self.descriptors = []
+        self.next_index = 0
+        dbus.service.Object.__init__(self, self.bus, self.path)
+
+    def get_properties(self):
+        return {
+                GATT_CHRC_IFACE: {
+                        'Service': self.service.get_path(),
+                        'UUID': self.uuid,
+                        'Flags': self.flags,
+                        'Descriptors': dbus.Array(
+                                self.get_descriptor_paths(),
+                                signature='o')
+                }
+        }
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    def add_descriptor(self, descriptor):
+        self.descriptors.append(descriptor)
+
+    def get_descriptor_paths(self):
+        result = []
+        for desc in self.descriptors:
+            result.append(desc.get_path())
+        return result
+
+    def get_descriptors(self):
+        return self.descriptors
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        if interface != GATT_CHRC_IFACE:
+            raise InvalidArgsException()
+
+        return self.get_properties()[GATT_CHRC_IFACE]
+
+    @dbus.service.method(GATT_CHRC_IFACE,
+                        in_signature='a{sv}',
+                        out_signature='ay')
+    def ReadValue(self, options):
+        print('Default ReadValue called, returning error')
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
+    def WriteValue(self, value, options):
+        print('Default WriteValue called, returning error')
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_CHRC_IFACE)
+    def StartNotify(self):
+        print('Default StartNotify called, returning error')
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_CHRC_IFACE)
+    def StopNotify(self):
+        print('Default StopNotify called, returning error')
+        raise NotSupportedException()
+
+    @dbus.service.signal(DBUS_PROP_IFACE,
+                         signature='sa{sv}as')
+    def PropertiesChanged(self, interface, changed, invalidated):
+        pass
+
+    def get_bus(self):
+        bus = self.bus
+
+        return bus
+
+    def get_next_index(self):
+        idx = self.next_index
+        self.next_index += 1
+
+        return idx
+
+    def add_timeout(self, timeout, callback):
+        GObject.timeout_add(timeout, callback)
+
+
+class Descriptor(dbus.service.Object):
+    def __init__(self, uuid, flags, characteristic):
+        index = characteristic.get_next_index()
+        self.path = characteristic.path + '/desc' + str(index)
+        self.uuid = uuid
+        self.flags = flags
+        self.chrc = characteristic
+        self.bus = characteristic.get_bus()
+        dbus.service.Object.__init__(self, self.bus, self.path)
+
+    def get_properties(self):
+        return {
+                GATT_DESC_IFACE: {
+                        'Characteristic': self.chrc.get_path(),
+                        'UUID': self.uuid,
+                        'Flags': self.flags,
+                }
+        }
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        if interface != GATT_DESC_IFACE:
+            raise InvalidArgsException()
+
+        return self.get_properties()[GATT_DESC_IFACE]
+
+    @dbus.service.method(GATT_DESC_IFACE,
+                        in_signature='a{sv}',
+                        out_signature='ay')
+    def ReadValue(self, options):
+        print ('Default ReadValue called, returning error')
+        raise NotSupportedException()
+
+    @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
+    def WriteValue(self, value, options):
+        print('Default WriteValue called, returning error')
+        raise NotSupportedException()
+
+
+class CharacteristicUserDescriptionDescriptor(Descriptor):
+    CUD_UUID = '2901'
+
+    def __init__(self, bus, index, characteristic):
+        self.writable = 'writable-auxiliaries' in characteristic.flags
+        self.value = array.array('B', b'This is a characteristic for testing')
+        self.value = self.value.tolist()
+        Descriptor.__init__(
+                self, bus, index,
+                self.CUD_UUID,
+                ['read', 'write'],
+                characteristic)
+
+    def ReadValue(self, options):
+        return self.value
+
+    def WriteValue(self, value, options):
+        if not self.writable:
+            raise NotPermittedException()
+        self.value = value
-- 
GitLab