Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
R
Robobin
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Deploy
Releases
Package Registry
Model registry
Operate
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
plw1g21
Robobin
Commits
967b9afa
Commit
967b9afa
authored
5 months ago
by
Paul-Winpenny
Browse files
Options
Downloads
Patches
Plain Diff
added the other python files required for the BLE RPi
parent
349d193e
No related branches found
Branches containing commit
No related tags found
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
BLE/advertisement.py
+134
-0
134 additions, 0 deletions
BLE/advertisement.py
BLE/service.py
+315
-0
315 additions, 0 deletions
BLE/service.py
with
449 additions
and
0 deletions
BLE/advertisement.py
0 → 100644
+
134
−
0
View file @
967b9afa
"""
Copyright (c) 2019, Douglas Otwell
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the
"
Software
"
), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED
"
AS IS
"
, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import
dbus
import
dbus.service
from
bletools
import
BleTools
BLUEZ_SERVICE_NAME
=
"
org.bluez
"
LE_ADVERTISING_MANAGER_IFACE
=
"
org.bluez.LEAdvertisingManager1
"
DBUS_OM_IFACE
=
"
org.freedesktop.DBus.ObjectManager
"
DBUS_PROP_IFACE
=
"
org.freedesktop.DBus.Properties
"
LE_ADVERTISEMENT_IFACE
=
"
org.bluez.LEAdvertisement1
"
class
Advertisement
(
dbus
.
service
.
Object
):
PATH_BASE
=
"
/org/bluez/example/advertisement
"
def
__init__
(
self
,
index
,
advertising_type
):
self
.
path
=
self
.
PATH_BASE
+
str
(
index
)
self
.
bus
=
BleTools
.
get_bus
()
self
.
ad_type
=
advertising_type
self
.
local_name
=
None
self
.
service_uuids
=
None
self
.
solicit_uuids
=
None
self
.
manufacturer_data
=
None
self
.
service_data
=
None
self
.
include_tx_power
=
None
dbus
.
service
.
Object
.
__init__
(
self
,
self
.
bus
,
self
.
path
)
def
get_properties
(
self
):
properties
=
dict
()
properties
[
"
Type
"
]
=
self
.
ad_type
if
self
.
local_name
is
not
None
:
properties
[
"
LocalName
"
]
=
dbus
.
String
(
self
.
local_name
)
if
self
.
service_uuids
is
not
None
:
properties
[
"
ServiceUUIDs
"
]
=
dbus
.
Array
(
self
.
service_uuids
,
signature
=
'
s
'
)
if
self
.
solicit_uuids
is
not
None
:
properties
[
"
SolicitUUIDs
"
]
=
dbus
.
Array
(
self
.
solicit_uuids
,
signature
=
'
s
'
)
if
self
.
manufacturer_data
is
not
None
:
properties
[
"
ManufacturerData
"
]
=
dbus
.
Dictionary
(
self
.
manufacturer_data
,
signature
=
'
qv
'
)
if
self
.
service_data
is
not
None
:
properties
[
"
ServiceData
"
]
=
dbus
.
Dictionary
(
self
.
service_data
,
signature
=
'
sv
'
)
if
self
.
include_tx_power
is
not
None
:
properties
[
"
IncludeTxPower
"
]
=
dbus
.
Boolean
(
self
.
include_tx_power
)
if
self
.
local_name
is
not
None
:
properties
[
"
LocalName
"
]
=
dbus
.
String
(
self
.
local_name
)
return
{
LE_ADVERTISEMENT_IFACE
:
properties
}
def
get_path
(
self
):
return
dbus
.
ObjectPath
(
self
.
path
)
def
add_service_uuid
(
self
,
uuid
):
if
not
self
.
service_uuids
:
self
.
service_uuids
=
[]
self
.
service_uuids
.
append
(
uuid
)
def
add_solicit_uuid
(
self
,
uuid
):
if
not
self
.
solicit_uuids
:
self
.
solicit_uuids
=
[]
self
.
solicit_uuids
.
append
(
uuid
)
def
add_manufacturer_data
(
self
,
manuf_code
,
data
):
if
not
self
.
manufacturer_data
:
self
.
manufacturer_data
=
dbus
.
Dictionary
({},
signature
=
"
qv
"
)
self
.
manufacturer_data
[
manuf_code
]
=
dbus
.
Array
(
data
,
signature
=
"
y
"
)
def
add_service_data
(
self
,
uuid
,
data
):
if
not
self
.
service_data
:
self
.
service_data
=
dbus
.
Dictionary
({},
signature
=
"
sv
"
)
self
.
service_data
[
uuid
]
=
dbus
.
Array
(
data
,
signature
=
"
y
"
)
def
add_local_name
(
self
,
name
):
if
not
self
.
local_name
:
self
.
local_name
=
""
self
.
local_name
=
dbus
.
String
(
name
)
@dbus.service.method
(
DBUS_PROP_IFACE
,
in_signature
=
"
s
"
,
out_signature
=
"
a{sv}
"
)
def
GetAll
(
self
,
interface
):
if
interface
!=
LE_ADVERTISEMENT_IFACE
:
raise
InvalidArgsException
()
return
self
.
get_properties
()[
LE_ADVERTISEMENT_IFACE
]
@dbus.service.method
(
LE_ADVERTISEMENT_IFACE
,
in_signature
=
''
,
out_signature
=
''
)
def
Release
(
self
):
print
(
'
%s: Released!
'
%
self
.
path
)
def
register_ad_callback
(
self
):
print
(
"
GATT advertisement registered
"
)
def
register_ad_error_callback
(
self
):
print
(
"
Failed to register GATT advertisement
"
)
def
register
(
self
):
bus
=
BleTools
.
get_bus
()
adapter
=
BleTools
.
find_adapter
(
bus
)
ad_manager
=
dbus
.
Interface
(
bus
.
get_object
(
BLUEZ_SERVICE_NAME
,
adapter
),
LE_ADVERTISING_MANAGER_IFACE
)
ad_manager
.
RegisterAdvertisement
(
self
.
get_path
(),
{},
reply_handler
=
self
.
register_ad_callback
,
error_handler
=
self
.
register_ad_error_callback
)
This diff is collapsed.
Click to expand it.
BLE/service.py
0 → 100644
+
315
−
0
View file @
967b9afa
"""
Copyright (c) 2019, Douglas Otwell
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the
"
Software
"
), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED
"
AS IS
"
, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import
dbus
import
dbus.mainloop.glib
import
dbus.exceptions
try
:
from
gi.repository
import
GObject
except
ImportError
:
import
gobject
as
GObject
from
bletools
import
BleTools
BLUEZ_SERVICE_NAME
=
"
org.bluez
"
GATT_MANAGER_IFACE
=
"
org.bluez.GattManager1
"
DBUS_OM_IFACE
=
"
org.freedesktop.DBus.ObjectManager
"
DBUS_PROP_IFACE
=
"
org.freedesktop.DBus.Properties
"
GATT_SERVICE_IFACE
=
"
org.bluez.GattService1
"
GATT_CHRC_IFACE
=
"
org.bluez.GattCharacteristic1
"
GATT_DESC_IFACE
=
"
org.bluez.GattDescriptor1
"
class
InvalidArgsException
(
dbus
.
exceptions
.
DBusException
):
_dbus_error_name
=
"
org.freedesktop.DBus.Error.InvalidArgs
"
class
NotSupportedException
(
dbus
.
exceptions
.
DBusException
):
_dbus_error_name
=
"
org.bluez.Error.NotSupported
"
class
NotPermittedException
(
dbus
.
exceptions
.
DBusException
):
_dbus_error_name
=
"
org.bluez.Error.NotPermitted
"
class
Application
(
dbus
.
service
.
Object
):
def
__init__
(
self
):
dbus
.
mainloop
.
glib
.
DBusGMainLoop
(
set_as_default
=
True
)
self
.
mainloop
=
GObject
.
MainLoop
()
self
.
bus
=
BleTools
.
get_bus
()
self
.
path
=
"
/
"
self
.
services
=
[]
self
.
next_index
=
0
dbus
.
service
.
Object
.
__init__
(
self
,
self
.
bus
,
self
.
path
)
def
get_path
(
self
):
return
dbus
.
ObjectPath
(
self
.
path
)
def
add_service
(
self
,
service
):
self
.
services
.
append
(
service
)
@dbus.service.method
(
DBUS_OM_IFACE
,
out_signature
=
"
a{oa{sa{sv}}}
"
)
def
GetManagedObjects
(
self
):
response
=
{}
for
service
in
self
.
services
:
response
[
service
.
get_path
()]
=
service
.
get_properties
()
chrcs
=
service
.
get_characteristics
()
for
chrc
in
chrcs
:
response
[
chrc
.
get_path
()]
=
chrc
.
get_properties
()
descs
=
chrc
.
get_descriptors
()
for
desc
in
descs
:
response
[
desc
.
get_path
()]
=
desc
.
get_properties
()
return
response
def
register_app_callback
(
self
):
print
(
"
GATT application registered
"
)
def
register_app_error_callback
(
self
,
error
):
print
(
"
Failed to register application:
"
+
str
(
error
))
def
register
(
self
):
adapter
=
BleTools
.
find_adapter
(
self
.
bus
)
service_manager
=
dbus
.
Interface
(
self
.
bus
.
get_object
(
BLUEZ_SERVICE_NAME
,
adapter
),
GATT_MANAGER_IFACE
)
service_manager
.
RegisterApplication
(
self
.
get_path
(),
{},
reply_handler
=
self
.
register_app_callback
,
error_handler
=
self
.
register_app_error_callback
)
def
run
(
self
):
self
.
mainloop
.
run
()
def
quit
(
self
):
print
(
"
\n
GATT application terminated
"
)
self
.
mainloop
.
quit
()
class
Service
(
dbus
.
service
.
Object
):
PATH_BASE
=
"
/org/bluez/example/service
"
def
__init__
(
self
,
index
,
uuid
,
primary
):
self
.
bus
=
BleTools
.
get_bus
()
self
.
path
=
self
.
PATH_BASE
+
str
(
index
)
self
.
uuid
=
uuid
self
.
primary
=
primary
self
.
characteristics
=
[]
self
.
next_index
=
0
dbus
.
service
.
Object
.
__init__
(
self
,
self
.
bus
,
self
.
path
)
def
get_properties
(
self
):
return
{
GATT_SERVICE_IFACE
:
{
'
UUID
'
:
self
.
uuid
,
'
Primary
'
:
self
.
primary
,
'
Characteristics
'
:
dbus
.
Array
(
self
.
get_characteristic_paths
(),
signature
=
'
o
'
)
}
}
def
get_path
(
self
):
return
dbus
.
ObjectPath
(
self
.
path
)
def
add_characteristic
(
self
,
characteristic
):
self
.
characteristics
.
append
(
characteristic
)
def
get_characteristic_paths
(
self
):
result
=
[]
for
chrc
in
self
.
characteristics
:
result
.
append
(
chrc
.
get_path
())
return
result
def
get_characteristics
(
self
):
return
self
.
characteristics
def
get_bus
(
self
):
return
self
.
bus
def
get_next_index
(
self
):
idx
=
self
.
next_index
self
.
next_index
+=
1
return
idx
@dbus.service.method
(
DBUS_PROP_IFACE
,
in_signature
=
'
s
'
,
out_signature
=
'
a{sv}
'
)
def
GetAll
(
self
,
interface
):
if
interface
!=
GATT_SERVICE_IFACE
:
raise
InvalidArgsException
()
return
self
.
get_properties
()[
GATT_SERVICE_IFACE
]
class
Characteristic
(
dbus
.
service
.
Object
):
"""
org.bluez.GattCharacteristic1 interface implementation
"""
def
__init__
(
self
,
uuid
,
flags
,
service
):
index
=
service
.
get_next_index
()
self
.
path
=
service
.
path
+
'
/char
'
+
str
(
index
)
self
.
bus
=
service
.
get_bus
()
self
.
uuid
=
uuid
self
.
service
=
service
self
.
flags
=
flags
self
.
descriptors
=
[]
self
.
next_index
=
0
dbus
.
service
.
Object
.
__init__
(
self
,
self
.
bus
,
self
.
path
)
def
get_properties
(
self
):
return
{
GATT_CHRC_IFACE
:
{
'
Service
'
:
self
.
service
.
get_path
(),
'
UUID
'
:
self
.
uuid
,
'
Flags
'
:
self
.
flags
,
'
Descriptors
'
:
dbus
.
Array
(
self
.
get_descriptor_paths
(),
signature
=
'
o
'
)
}
}
def
get_path
(
self
):
return
dbus
.
ObjectPath
(
self
.
path
)
def
add_descriptor
(
self
,
descriptor
):
self
.
descriptors
.
append
(
descriptor
)
def
get_descriptor_paths
(
self
):
result
=
[]
for
desc
in
self
.
descriptors
:
result
.
append
(
desc
.
get_path
())
return
result
def
get_descriptors
(
self
):
return
self
.
descriptors
@dbus.service.method
(
DBUS_PROP_IFACE
,
in_signature
=
'
s
'
,
out_signature
=
'
a{sv}
'
)
def
GetAll
(
self
,
interface
):
if
interface
!=
GATT_CHRC_IFACE
:
raise
InvalidArgsException
()
return
self
.
get_properties
()[
GATT_CHRC_IFACE
]
@dbus.service.method
(
GATT_CHRC_IFACE
,
in_signature
=
'
a{sv}
'
,
out_signature
=
'
ay
'
)
def
ReadValue
(
self
,
options
):
print
(
'
Default ReadValue called, returning error
'
)
raise
NotSupportedException
()
@dbus.service.method
(
GATT_CHRC_IFACE
,
in_signature
=
'
aya{sv}
'
)
def
WriteValue
(
self
,
value
,
options
):
print
(
'
Default WriteValue called, returning error
'
)
raise
NotSupportedException
()
@dbus.service.method
(
GATT_CHRC_IFACE
)
def
StartNotify
(
self
):
print
(
'
Default StartNotify called, returning error
'
)
raise
NotSupportedException
()
@dbus.service.method
(
GATT_CHRC_IFACE
)
def
StopNotify
(
self
):
print
(
'
Default StopNotify called, returning error
'
)
raise
NotSupportedException
()
@dbus.service.signal
(
DBUS_PROP_IFACE
,
signature
=
'
sa{sv}as
'
)
def
PropertiesChanged
(
self
,
interface
,
changed
,
invalidated
):
pass
def
get_bus
(
self
):
bus
=
self
.
bus
return
bus
def
get_next_index
(
self
):
idx
=
self
.
next_index
self
.
next_index
+=
1
return
idx
def
add_timeout
(
self
,
timeout
,
callback
):
GObject
.
timeout_add
(
timeout
,
callback
)
class
Descriptor
(
dbus
.
service
.
Object
):
def
__init__
(
self
,
uuid
,
flags
,
characteristic
):
index
=
characteristic
.
get_next_index
()
self
.
path
=
characteristic
.
path
+
'
/desc
'
+
str
(
index
)
self
.
uuid
=
uuid
self
.
flags
=
flags
self
.
chrc
=
characteristic
self
.
bus
=
characteristic
.
get_bus
()
dbus
.
service
.
Object
.
__init__
(
self
,
self
.
bus
,
self
.
path
)
def
get_properties
(
self
):
return
{
GATT_DESC_IFACE
:
{
'
Characteristic
'
:
self
.
chrc
.
get_path
(),
'
UUID
'
:
self
.
uuid
,
'
Flags
'
:
self
.
flags
,
}
}
def
get_path
(
self
):
return
dbus
.
ObjectPath
(
self
.
path
)
@dbus.service.method
(
DBUS_PROP_IFACE
,
in_signature
=
'
s
'
,
out_signature
=
'
a{sv}
'
)
def
GetAll
(
self
,
interface
):
if
interface
!=
GATT_DESC_IFACE
:
raise
InvalidArgsException
()
return
self
.
get_properties
()[
GATT_DESC_IFACE
]
@dbus.service.method
(
GATT_DESC_IFACE
,
in_signature
=
'
a{sv}
'
,
out_signature
=
'
ay
'
)
def
ReadValue
(
self
,
options
):
print
(
'
Default ReadValue called, returning error
'
)
raise
NotSupportedException
()
@dbus.service.method
(
GATT_DESC_IFACE
,
in_signature
=
'
aya{sv}
'
)
def
WriteValue
(
self
,
value
,
options
):
print
(
'
Default WriteValue called, returning error
'
)
raise
NotSupportedException
()
class
CharacteristicUserDescriptionDescriptor
(
Descriptor
):
CUD_UUID
=
'
2901
'
def
__init__
(
self
,
bus
,
index
,
characteristic
):
self
.
writable
=
'
writable-auxiliaries
'
in
characteristic
.
flags
self
.
value
=
array
.
array
(
'
B
'
,
b
'
This is a characteristic for testing
'
)
self
.
value
=
self
.
value
.
tolist
()
Descriptor
.
__init__
(
self
,
bus
,
index
,
self
.
CUD_UUID
,
[
'
read
'
,
'
write
'
],
characteristic
)
def
ReadValue
(
self
,
options
):
return
self
.
value
def
WriteValue
(
self
,
value
,
options
):
if
not
self
.
writable
:
raise
NotPermittedException
()
self
.
value
=
value
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment