Source code for bleak.backends.p4android.scanner

# -*- coding: utf-8 -*-

import asyncio
import logging
from typing import List
import warnings

from bleak.backends.scanner import BaseBleakScanner, AdvertisementData
from bleak.backends.device import BLEDevice
from bleak.exc import BleakError

from android.broadcast import BroadcastReceiver
from android.permissions import request_permissions, Permission
from jnius import cast, java_method

from . import defs
from . import utils

logger = logging.getLogger(__name__)


[docs]class BleakScannerP4Android(BaseBleakScanner): """ The python-for-android Bleak BLE Scanner. Args: **detection_callback (callable or coroutine): Optional function that will be called each time a device is discovered or advertising data has changed. **service_uuids (List[str]): Optional list of service UUIDs to filter on. Only advertisements containing this advertising data will be received. Specifying this also enables scanning while the screen is off on Android. """ __scanner = None def __init__(self, **kwargs): super(BleakScannerP4Android, self).__init__(**kwargs) self._devices = {} self.__adapter = None self.__javascanner = None self.__callback = None def __del__(self): self.__stop()
[docs] async def start(self): if BleakScannerP4Android.__scanner is not None: raise BleakError("A BleakScanner is already scanning on this adapter.") logger.debug("Starting BTLE scan") loop = asyncio.get_event_loop() if self.__javascanner is None: if self.__callback is None: self.__callback = _PythonScanCallback(self, loop) permission_acknowledged = loop.create_future() def handle_permissions(permissions, grantResults): if any(grantResults): loop.call_soon_threadsafe( permission_acknowledged.set_result, grantResults ) else: loop.call_soon_threadsafe( permission_acknowledged.set_exception( BleakError("User denied access to " + str(permissions)) ) ) request_permissions( [ Permission.ACCESS_FINE_LOCATION, Permission.ACCESS_COARSE_LOCATION, "android.permission.ACCESS_BACKGROUND_LOCATION", ], handle_permissions, ) await permission_acknowledged self.__adapter = defs.BluetoothAdapter.getDefaultAdapter() if self.__adapter is None: raise BleakError("Bluetooth is not supported on this hardware platform") if self.__adapter.getState() != defs.BluetoothAdapter.STATE_ON: raise BleakError("Bluetooth is not turned on") self.__javascanner = self.__adapter.getBluetoothLeScanner() BleakScannerP4Android.__scanner = self filters = cast("java.util.List", defs.List()) if self._service_uuids: for uuid in self._service_uuids: filters.add( defs.ScanFilterBuilder() .setServiceUuid(defs.ParcelUuid.fromString(uuid)) .build() ) scanfuture = self.__callback.perform_and_wait( dispatchApi=self.__javascanner.startScan, dispatchParams=( filters, defs.ScanSettingsBuilder() .setScanMode(defs.ScanSettings.SCAN_MODE_LOW_LATENCY) .setReportDelay(0) .setPhy(defs.ScanSettings.PHY_LE_ALL_SUPPORTED) .setNumOfMatches(defs.ScanSettings.MATCH_NUM_MAX_ADVERTISEMENT) .setMatchMode(defs.ScanSettings.MATCH_MODE_AGGRESSIVE) .setCallbackType(defs.ScanSettings.CALLBACK_TYPE_ALL_MATCHES) .build(), self.__callback.java, ), resultApi="onScan", return_indicates_status=False, ) self.__javascanner.flushPendingScanResults(self.__callback.java) try: await asyncio.wait_for(scanfuture, timeout=0.2) except asyncio.exceptions.TimeoutError: pass except BleakError as bleakerror: await self.stop() if bleakerror.args != ( "onScan", "SCAN_FAILED_APPLICATION_REGISTRATION_FAILED", ): raise bleakerror else: # there might be a clearer solution to this if android source and vendor # documentation are reviewed for the meaning of the error # https://stackoverflow.com/questions/27516399/solution-for-ble-scans-scan-failed-application-registration-failed warnings.warn( "BT API gave SCAN_FAILED_APPLICATION_REGISTRATION_FAILED. Resetting adapter." ) def handlerWaitingForState(state, stateFuture): def handleAdapterStateChanged(context, intent): adapter_state = intent.getIntExtra( defs.BluetoothAdapter.EXTRA_STATE, defs.BluetoothAdapter.STATE_ERROR, ) if adapter_state == defs.BluetoothAdapter.STATE_ERROR: loop.call_soon_threadsafe( stateOffFuture.set_exception, BleakError(f"Unexpected adapter state {adapter_state}"), ) elif adapter_state == state: loop.call_soon_threadsafe( stateFuture.set_result, adapter_state ) return handleAdapterStateChanged logger.info( "disabling bluetooth adapter to handle SCAN_FAILED_APPLICATION_REGSTRATION_FAILED ..." ) stateOffFuture = loop.create_future() receiver = BroadcastReceiver( handlerWaitingForState( defs.BluetoothAdapter.STATE_OFF, stateOffFuture ), actions=[defs.BluetoothAdapter.ACTION_STATE_CHANGED], ) receiver.start() try: self.__adapter.disable() await stateOffFuture finally: receiver.stop() logger.info("re-enabling bluetooth adapter ...") stateOnFuture = loop.create_future() receiver = BroadcastReceiver( handlerWaitingForState( defs.BluetoothAdapter.STATE_ON, stateOnFuture ), actions=[defs.BluetoothAdapter.ACTION_STATE_CHANGED], ) receiver.start() try: self.__adapter.enable() await stateOnFuture finally: receiver.stop() logger.debug("restarting scan ...") return await self.start()
def __stop(self): if self.__javascanner is not None: logger.debug("Stopping BTLE scan") self.__javascanner.stopScan(self.__callback.java) BleakScannerP4Android.__scanner = None self.__javascanner = None else: logger.debug("BTLE scan already stopped")
[docs] async def stop(self): self.__stop()
[docs] def set_scanning_filter(self, **kwargs): # If we do end up implementing this, this should accept List<ScanFilter> # and ScanSettings java objects to pass to startScan(). raise NotImplementedError("not implemented in Android backend")
@property def discovered_devices(self) -> List[BLEDevice]: return [*self._devices.values()]
class _PythonScanCallback(utils.AsyncJavaCallbacks): __javainterfaces__ = ["com.github.hbldh.bleak.PythonScanCallback$Interface"] def __init__(self, scanner, loop): super().__init__(loop) self._scanner = scanner self.java = defs.PythonScanCallback(self) def result_state(self, status_str, name, *data): self._loop.call_soon_threadsafe( self._result_state_unthreadsafe, status_str, name, data ) @java_method("(I)V") def onScanFailed(self, errorCode): self.result_state(defs.ScanFailed(errorCode).name, "onScan") @java_method("(Landroid/bluetooth/le/ScanResult;)V") def onScanResult(self, result): device = result.getDevice() record = result.getScanRecord() service_uuids = record.getServiceUuids() if service_uuids is not None: service_uuids = [service_uuid.toString() for service_uuid in service_uuids] manufacturer_data = record.getManufacturerSpecificData() manufacturer_data = { manufacturer_data.keyAt(index): bytes(manufacturer_data.valueAt(index)) for index in range(manufacturer_data.size()) } service_data = { entry.getKey().toString(): bytes(entry.getValue()) for entry in record.getServiceData().entrySet() } advertisement = AdvertisementData( local_name=record.getDeviceName(), manufacturer_data=manufacturer_data, service_data=service_data, service_uuids=service_uuids, platform_data=(result,), ) device = BLEDevice( device.getAddress(), device.getName(), rssi=result.getRssi(), uuids=service_uuids, manufacturer_data=manufacturer_data, ) self._scanner._devices[device.address] = device if "onScan" not in self.states: self.result_state(None, "onScan", device) if self._scanner._callback: self._loop.call_soon_threadsafe( self._scanner._callback, device, advertisement )