Source code for bleak.backends.bluezdbus.utils

# -*- coding: utf-8 -*-
import re
from typing import Any, Dict

from dbus_next.constants import MessageType
from dbus_next.message import Message
from dbus_next.signature import Variant

from bleak import BleakError
from bleak.exc import BleakDBusError

_address_regex = re.compile("^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$")


[docs]def assert_reply(reply: Message): """Checks that a D-Bus message is a valid reply. Raises: BleakDBusError: if the message type is ``MessageType.ERROR`` AssentationError: if the message type is not ``MessageType.METHOD_RETURN`` """ if reply.message_type == MessageType.ERROR: raise BleakDBusError(reply.error_name, reply.body) assert reply.message_type == MessageType.METHOD_RETURN
[docs]def validate_address(address): return _address_regex.match(address) is not None
[docs]def unpack_variants(dictionary: Dict[str, Variant]) -> Dict[str, Any]: """Recursively unpacks all ``Variant`` types in a dictionary to their corresponding Python types. ``dbus-next`` doesn't automatically do this, so this needs to be called on all dictionaries ("a{sv}") returned from D-Bus messages. """ unpacked = {} for k, v in dictionary.items(): v = v.value if isinstance(v, Variant) else v if isinstance(v, dict): v = unpack_variants(v) elif isinstance(v, list): v = [x.value if isinstance(x, Variant) else x for x in v] unpacked[k] = v return unpacked
[docs]def extract_service_handle_from_path(path): try: return int(path[-4:], 16) except Exception as e: raise BleakError(f"Could not parse service handle from path: {path}") from e