# -*- coding: utf-8 -*-
import asyncio
import logging
from typing import List
import warnings
from bleak.backends.scanner import BaseBleakScanner, AdvertisementData
from bleak.backends.device import BLEDevice
from bleak.exc import BleakError
from android.broadcast import BroadcastReceiver
from android.permissions import request_permissions, Permission
from jnius import cast, java_method
from . import defs
from . import utils
logger = logging.getLogger(__name__)
[docs]class BleakScannerP4Android(BaseBleakScanner):
"""
The python-for-android Bleak BLE Scanner.
Args:
**detection_callback (callable or coroutine):
Optional function that will be called each time a device is
discovered or advertising data has changed.
**service_uuids (List[str]):
Optional list of service UUIDs to filter on. Only advertisements
containing this advertising data will be received. Specifying this
also enables scanning while the screen is off on Android.
"""
__scanner = None
def __init__(self, **kwargs):
super(BleakScannerP4Android, self).__init__(**kwargs)
self._devices = {}
self.__adapter = None
self.__javascanner = None
self.__callback = None
def __del__(self):
self.__stop()
[docs] async def start(self):
if BleakScannerP4Android.__scanner is not None:
raise BleakError("A BleakScanner is already scanning on this adapter.")
logger.debug("Starting BTLE scan")
loop = asyncio.get_event_loop()
if self.__javascanner is None:
if self.__callback is None:
self.__callback = _PythonScanCallback(self, loop)
permission_acknowledged = loop.create_future()
def handle_permissions(permissions, grantResults):
if any(grantResults):
loop.call_soon_threadsafe(
permission_acknowledged.set_result, grantResults
)
else:
loop.call_soon_threadsafe(
permission_acknowledged.set_exception(
BleakError("User denied access to " + str(permissions))
)
)
request_permissions(
[
Permission.ACCESS_FINE_LOCATION,
Permission.ACCESS_COARSE_LOCATION,
"android.permission.ACCESS_BACKGROUND_LOCATION",
],
handle_permissions,
)
await permission_acknowledged
self.__adapter = defs.BluetoothAdapter.getDefaultAdapter()
if self.__adapter is None:
raise BleakError("Bluetooth is not supported on this hardware platform")
if self.__adapter.getState() != defs.BluetoothAdapter.STATE_ON:
raise BleakError("Bluetooth is not turned on")
self.__javascanner = self.__adapter.getBluetoothLeScanner()
BleakScannerP4Android.__scanner = self
filters = cast("java.util.List", defs.List())
if self._service_uuids:
for uuid in self._service_uuids:
filters.add(
defs.ScanFilterBuilder()
.setServiceUuid(defs.ParcelUuid.fromString(uuid))
.build()
)
scanfuture = self.__callback.perform_and_wait(
dispatchApi=self.__javascanner.startScan,
dispatchParams=(
filters,
defs.ScanSettingsBuilder()
.setScanMode(defs.ScanSettings.SCAN_MODE_LOW_LATENCY)
.setReportDelay(0)
.setPhy(defs.ScanSettings.PHY_LE_ALL_SUPPORTED)
.setNumOfMatches(defs.ScanSettings.MATCH_NUM_MAX_ADVERTISEMENT)
.setMatchMode(defs.ScanSettings.MATCH_MODE_AGGRESSIVE)
.setCallbackType(defs.ScanSettings.CALLBACK_TYPE_ALL_MATCHES)
.build(),
self.__callback.java,
),
resultApi="onScan",
return_indicates_status=False,
)
self.__javascanner.flushPendingScanResults(self.__callback.java)
try:
await asyncio.wait_for(scanfuture, timeout=0.2)
except asyncio.exceptions.TimeoutError:
pass
except BleakError as bleakerror:
await self.stop()
if bleakerror.args != (
"onScan",
"SCAN_FAILED_APPLICATION_REGISTRATION_FAILED",
):
raise bleakerror
else:
# there might be a clearer solution to this if android source and vendor
# documentation are reviewed for the meaning of the error
# https://stackoverflow.com/questions/27516399/solution-for-ble-scans-scan-failed-application-registration-failed
warnings.warn(
"BT API gave SCAN_FAILED_APPLICATION_REGISTRATION_FAILED. Resetting adapter."
)
def handlerWaitingForState(state, stateFuture):
def handleAdapterStateChanged(context, intent):
adapter_state = intent.getIntExtra(
defs.BluetoothAdapter.EXTRA_STATE,
defs.BluetoothAdapter.STATE_ERROR,
)
if adapter_state == defs.BluetoothAdapter.STATE_ERROR:
loop.call_soon_threadsafe(
stateOffFuture.set_exception,
BleakError(f"Unexpected adapter state {adapter_state}"),
)
elif adapter_state == state:
loop.call_soon_threadsafe(
stateFuture.set_result, adapter_state
)
return handleAdapterStateChanged
logger.info(
"disabling bluetooth adapter to handle SCAN_FAILED_APPLICATION_REGSTRATION_FAILED ..."
)
stateOffFuture = loop.create_future()
receiver = BroadcastReceiver(
handlerWaitingForState(
defs.BluetoothAdapter.STATE_OFF, stateOffFuture
),
actions=[defs.BluetoothAdapter.ACTION_STATE_CHANGED],
)
receiver.start()
try:
self.__adapter.disable()
await stateOffFuture
finally:
receiver.stop()
logger.info("re-enabling bluetooth adapter ...")
stateOnFuture = loop.create_future()
receiver = BroadcastReceiver(
handlerWaitingForState(
defs.BluetoothAdapter.STATE_ON, stateOnFuture
),
actions=[defs.BluetoothAdapter.ACTION_STATE_CHANGED],
)
receiver.start()
try:
self.__adapter.enable()
await stateOnFuture
finally:
receiver.stop()
logger.debug("restarting scan ...")
return await self.start()
def __stop(self):
if self.__javascanner is not None:
logger.debug("Stopping BTLE scan")
self.__javascanner.stopScan(self.__callback.java)
BleakScannerP4Android.__scanner = None
self.__javascanner = None
else:
logger.debug("BTLE scan already stopped")
[docs] async def stop(self):
self.__stop()
[docs] def set_scanning_filter(self, **kwargs):
# If we do end up implementing this, this should accept List<ScanFilter>
# and ScanSettings java objects to pass to startScan().
raise NotImplementedError("not implemented in Android backend")
@property
def discovered_devices(self) -> List[BLEDevice]:
return [*self._devices.values()]
class _PythonScanCallback(utils.AsyncJavaCallbacks):
__javainterfaces__ = ["com.github.hbldh.bleak.PythonScanCallback$Interface"]
def __init__(self, scanner, loop):
super().__init__(loop)
self._scanner = scanner
self.java = defs.PythonScanCallback(self)
def result_state(self, status_str, name, *data):
self._loop.call_soon_threadsafe(
self._result_state_unthreadsafe, status_str, name, data
)
@java_method("(I)V")
def onScanFailed(self, errorCode):
self.result_state(defs.ScanFailed(errorCode).name, "onScan")
@java_method("(Landroid/bluetooth/le/ScanResult;)V")
def onScanResult(self, result):
device = result.getDevice()
record = result.getScanRecord()
service_uuids = record.getServiceUuids()
if service_uuids is not None:
service_uuids = [service_uuid.toString() for service_uuid in service_uuids]
manufacturer_data = record.getManufacturerSpecificData()
manufacturer_data = {
manufacturer_data.keyAt(index): bytes(manufacturer_data.valueAt(index))
for index in range(manufacturer_data.size())
}
service_data = {
entry.getKey().toString(): bytes(entry.getValue())
for entry in record.getServiceData().entrySet()
}
advertisement = AdvertisementData(
local_name=record.getDeviceName(),
manufacturer_data=manufacturer_data,
service_data=service_data,
service_uuids=service_uuids,
platform_data=(result,),
)
device = BLEDevice(
device.getAddress(),
device.getName(),
rssi=result.getRssi(),
uuids=service_uuids,
manufacturer_data=manufacturer_data,
)
self._scanner._devices[device.address] = device
if "onScan" not in self.states:
self.result_state(None, "onScan", device)
if self._scanner._callback:
self._loop.call_soon_threadsafe(
self._scanner._callback, device, advertisement
)