import logging
from typing import Any, Dict, List, Optional
from dbus_next.aio import MessageBus
from dbus_next.constants import BusType, MessageType
from dbus_next.message import Message
from dbus_next.signature import Variant
from bleak.backends.bluezdbus import defs
from bleak.backends.bluezdbus.signals import MatchRules, add_match, remove_match
from bleak.backends.bluezdbus.utils import (
assert_reply,
unpack_variants,
validate_address,
)
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import BaseBleakScanner, AdvertisementData
logger = logging.getLogger(__name__)
# set of org.bluez.Device1 property names that come from advertising data
_ADVERTISING_DATA_PROPERTIES = {
"AdvertisingData",
"AdvertisingFlags",
"ManufacturerData",
"Name",
"ServiceData",
"UUIDs",
}
def _device_info(path, props):
try:
name = props.get("Alias", "Unknown")
address = props.get("Address", None)
if address is None:
try:
address = path[-17:].replace("_", ":")
if not validate_address(address):
address = None
except Exception:
address = None
rssi = props.get("RSSI", "?")
return name, address, rssi, path
except Exception:
return None, None, None, None
[docs]class BleakScannerBlueZDBus(BaseBleakScanner):
"""The native Linux Bleak BLE Scanner.
For possible values for `filters`, see the parameters to the
``SetDiscoveryFilter`` method in the `BlueZ docs
<https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/adapter-api.txt?h=5.48&id=0d1e3b9c5754022c779da129025d493a198d49cf>`_
Args:
**detection_callback (callable or coroutine):
Optional function that will be called each time a device is
discovered or advertising data has changed.
**service_uuids (List[str]):
Optional list of service UUIDs to filter on. Only advertisements
containing this advertising data will be received. Specifying this
also enables scanning while the screen is off on Android.
**adapter (str):
Bluetooth adapter to use for discovery.
**filters (dict):
A dict of filters to be applied on discovery.
"""
def __init__(self, **kwargs):
super(BleakScannerBlueZDBus, self).__init__(**kwargs)
# kwarg "device" is for backwards compatibility
self._adapter = kwargs.get("adapter", kwargs.get("device", "hci0"))
self._bus: Optional[MessageBus] = None
self._cached_devices: Dict[str, Variant] = {}
self._devices: Dict[str, Dict[str, Any]] = {}
self._rules: List[MatchRules] = []
self._adapter_path: str = f"/org/bluez/{self._adapter}"
# Discovery filters
self._filters: Dict[str, Variant] = {}
if self._service_uuids:
self._filters["UUIDs"] = Variant("as", self._service_uuids)
self.set_scanning_filter(**kwargs)
[docs] async def start(self):
self._bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
self._devices.clear()
self._cached_devices.clear()
# Add signal listeners
self._bus.add_message_handler(self._parse_msg)
rules = MatchRules(
interface=defs.OBJECT_MANAGER_INTERFACE,
member="InterfacesAdded",
arg0path=f"{self._adapter_path}/",
)
reply = await add_match(self._bus, rules)
assert_reply(reply)
self._rules.append(rules)
rules = MatchRules(
interface=defs.OBJECT_MANAGER_INTERFACE,
member="InterfacesRemoved",
arg0path=f"{self._adapter_path}/",
)
reply = await add_match(self._bus, rules)
assert_reply(reply)
self._rules.append(rules)
rules = MatchRules(
interface=defs.PROPERTIES_INTERFACE,
member="PropertiesChanged",
path_namespace=self._adapter_path,
)
reply = await add_match(self._bus, rules)
assert_reply(reply)
self._rules.append(rules)
# Find the HCI device to use for scanning and get cached device properties
reply = await self._bus.call(
Message(
destination=defs.BLUEZ_SERVICE,
path="/",
member="GetManagedObjects",
interface=defs.OBJECT_MANAGER_INTERFACE,
)
)
assert_reply(reply)
# get only the device interface
self._cached_devices = {
path: unpack_variants(interfaces[defs.DEVICE_INTERFACE])
for path, interfaces in reply.body[0].items()
if defs.DEVICE_INTERFACE in interfaces
}
logger.debug(f"cached devices: {self._cached_devices}")
# Apply the filters
reply = await self._bus.call(
Message(
destination=defs.BLUEZ_SERVICE,
path=self._adapter_path,
interface=defs.ADAPTER_INTERFACE,
member="SetDiscoveryFilter",
signature="a{sv}",
body=[self._filters],
)
)
assert_reply(reply)
# Start scanning
reply = await self._bus.call(
Message(
destination=defs.BLUEZ_SERVICE,
path=self._adapter_path,
interface=defs.ADAPTER_INTERFACE,
member="StartDiscovery",
)
)
assert_reply(reply)
[docs] async def stop(self):
reply = await self._bus.call(
Message(
destination=defs.BLUEZ_SERVICE,
path=self._adapter_path,
interface=defs.ADAPTER_INTERFACE,
member="StopDiscovery",
)
)
assert_reply(reply)
for rule in self._rules:
await remove_match(self._bus, rule)
self._rules.clear()
self._bus.remove_message_handler(self._parse_msg)
# Try to disconnect the System Bus.
try:
self._bus.disconnect()
except Exception as e:
logger.error("Attempt to disconnect system bus failed: {0}".format(e))
self._bus = None
[docs] def set_scanning_filter(self, **kwargs):
"""Sets OS level scanning filters for the BleakScanner.
For possible values for `filters`, see the parameters to the
``SetDiscoveryFilter`` method in the `BlueZ docs
<https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/adapter-api.txt?h=5.48&id=0d1e3b9c5754022c779da129025d493a198d49cf>`_
See variant types here: <https://python-dbus-next.readthedocs.io/en/latest/type-system/>
Keyword Args:
filters (dict): A dict of filters to be applied on discovery.
"""
for k, v in kwargs.get("filters", {}).items():
if k == "UUIDs":
self._filters[k] = Variant("as", v)
elif k == "RSSI":
self._filters[k] = Variant("n", v)
elif k == "Pathloss":
self._filters[k] = Variant("n", v)
elif k == "Transport":
self._filters[k] = Variant("s", v)
elif k == "DuplicateData":
self._filters[k] = Variant("b", v)
elif k == "Discoverable":
self._filters[k] = Variant("b", v)
elif k == "Pattern":
self._filters[k] = Variant("s", v)
else:
logger.warning("Filter '%s' is not currently supported." % k)
if "Transport" not in self._filters:
self._filters["Transport"] = Variant("s", "le")
@property
def discovered_devices(self) -> List[BLEDevice]:
# Reduce output.
discovered_devices = []
for path, props in self._devices.items():
if not props:
logger.debug(
"Disregarding %s since no properties could be obtained." % path
)
continue
name, address, _, path = _device_info(path, props)
if address is None:
continue
uuids = props.get("UUIDs", [])
manufacturer_data = props.get("ManufacturerData", {})
discovered_devices.append(
BLEDevice(
address,
name,
{"path": path, "props": props},
props.get("RSSI", 0),
uuids=uuids,
manufacturer_data=manufacturer_data,
)
)
return discovered_devices
# Helper methods
def _invoke_callback(self, path: str, message: Message) -> None:
"""Invokes the advertising data callback.
Args:
message: The D-Bus message that triggered the callback.
"""
if self._callback is None:
return
props = self._devices[path]
# Get all the information wanted to pack in the advertisement data
_local_name = props.get("Name")
_manufacturer_data = {
k: bytes(v) for k, v in props.get("ManufacturerData", {}).items()
}
_service_data = {k: bytes(v) for k, v in props.get("ServiceData", {}).items()}
_service_uuids = props.get("UUIDs", [])
# Pack the advertisement data
advertisement_data = AdvertisementData(
local_name=_local_name,
manufacturer_data=_manufacturer_data,
service_data=_service_data,
service_uuids=_service_uuids,
platform_data=(props, message),
)
device = BLEDevice(
props["Address"],
props["Alias"],
{"path": path, "props": props},
props.get("RSSI", 0),
uuids=_service_uuids,
manufacturer_data=_manufacturer_data,
)
self._callback(device, advertisement_data)
def _parse_msg(self, message: Message):
if message.message_type != MessageType.SIGNAL:
return
logger.debug(
"received D-Bus signal: {0}.{1} ({2}): {3}".format(
message.interface, message.member, message.path, message.body
)
)
if message.member == "InterfacesAdded":
# if a new device is discovered while we are scanning, add it to
# the discovered devices list
obj_path: str
interfaces_and_props: Dict[str, Dict[str, Variant]]
obj_path, interfaces_and_props = message.body
device_props = unpack_variants(
interfaces_and_props.get(defs.DEVICE_INTERFACE, {})
)
if device_props:
self._devices[obj_path] = device_props
self._invoke_callback(obj_path, message)
elif message.member == "InterfacesRemoved":
# if a device disappears while we are scanning, remove it from the
# discovered devices list
obj_path: str
interfaces: List[str]
obj_path, interfaces = message.body
if defs.DEVICE_INTERFACE in interfaces:
# Using pop to avoid KeyError if obj_path does not exist
self._devices.pop(obj_path, None)
elif message.member == "PropertiesChanged":
# Property change events basically mean that new advertising data
# was received or the RSSI changed. Either way, it lets us know
# that the device is active and we can add it to the discovered
# devices list.
interface: str
changed: Dict[str, Variant]
invalidated: List[str]
interface, changed, invalidated = message.body
if interface != defs.DEVICE_INTERFACE:
return
first_time_seen = False
if message.path not in self._devices:
if message.path not in self._cached_devices:
# This can happen when we start scanning. The "PropertyChanged"
# handler is attached before "GetManagedObjects" is called
# and so self._cached_devices is not assigned yet.
# This is not a problem. We just discard the property value
# since "GetManagedObjects" will return a newer value.
return
first_time_seen = True
self._devices[message.path] = self._cached_devices[message.path]
changed = unpack_variants(changed)
self._devices[message.path].update(changed)
# Only do advertising data callback if this is the first time the
# device has been seen or if an advertising data property changed.
# Otherwise we get a flood of callbacks from RSSI changing.
if first_time_seen or not _ADVERTISING_DATA_PROPERTIES.isdisjoint(
changed.keys()
):
self._invoke_callback(message.path, message)