- 
  Notifications
 You must be signed in to change notification settings 
- Fork 1.1k
aioble: Add pairing and bonding multitests #1021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
 
  Open
 
 
 
 
  Open
 Changes from all commits
 Commits
 
 
 File filter
Filter by extension
Conversations
 Failed to load comments. 
 
 
 
  Loading
 
 Jump to
 
 Jump to file
 
 
 
 Failed to load files. 
 
 
 
  Loading
 
 Diff view
Diff view
There are no files selected for viewing
 
 
 
 159 changes: 159 additions & 0 deletions
 
 
 
 micropython/bluetooth/aioble/multitests/ble_bond.py
 
 
 
 
  
 
 
 
 
 
 
 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
 Learn more about bidirectional Unicode characters
 
 
 
 
 | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| # Test BLE GAP pairing with bonding (persistent pairing) using aioble | ||
|  | ||
| import sys | ||
|  | ||
| # ruff: noqa: E402 | ||
| sys.path.append("") | ||
|  | ||
| from micropython import const | ||
| import machine | ||
| import time | ||
| import os | ||
|  | ||
| import asyncio | ||
| import aioble | ||
| import aioble.security | ||
| import bluetooth | ||
|  | ||
| TIMEOUT_MS = 5000 | ||
|  | ||
| SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A") | ||
| CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444") | ||
|  | ||
| _FLAG_READ = const(0x0002) | ||
| _FLAG_READ_ENCRYPTED = const(0x0200) | ||
|  | ||
|  | ||
| # For aioble, we need to directly use the low-level bluetooth API for encrypted characteristics | ||
| class EncryptedCharacteristic(aioble.Characteristic): | ||
| def __init__(self, service, uuid, **kwargs): | ||
| super().__init__(service, uuid, read=True, **kwargs) | ||
| # Override flags to add encryption requirement | ||
| self.flags |= _FLAG_READ_ENCRYPTED | ||
|  | ||
|  | ||
| # Acting in peripheral role. | ||
| async def instance0_task(): | ||
| # Clean up any existing secrets from previous tests | ||
| try: | ||
| os.remove("ble_secrets.json") | ||
| except: | ||
| pass | ||
|  | ||
| # Load secrets (will be empty initially but enables bond storage) | ||
| aioble.security.load_secrets() | ||
|  | ||
| service = aioble.Service(SERVICE_UUID) | ||
| characteristic = EncryptedCharacteristic(service, CHAR_UUID) | ||
| aioble.register_services(service) | ||
|  | ||
| multitest.globals(BDADDR=aioble.config("mac")) | ||
| multitest.next() | ||
|  | ||
| # Write initial characteristic value. | ||
| characteristic.write("bonded_data") | ||
|  | ||
| # Wait for central to connect to us. | ||
| print("advertise") | ||
| connection = await aioble.advertise( | ||
| 20_000, adv_data=b"\x02\x01\x06\x04\xffMPY", timeout_ms=TIMEOUT_MS | ||
| ) | ||
| print("connected") | ||
|  | ||
| # Wait for pairing to complete | ||
| print("wait_for_bonding") | ||
| start_time = time.ticks_ms() | ||
| while not connection.encrypted and time.ticks_diff(time.ticks_ms(), start_time) < TIMEOUT_MS: | ||
| await asyncio.sleep_ms(100) | ||
|  | ||
| # Give additional time for bonding to complete after encryption | ||
| await asyncio.sleep_ms(500) | ||
|  | ||
| if connection.encrypted: | ||
| print( | ||
| "bonded encrypted=1 authenticated={} bonded={}".format( | ||
| 1 if connection.authenticated else 0, 1 if connection.bonded else 0 | ||
| ) | ||
| ) | ||
| else: | ||
| print("bonding_timeout") | ||
|  | ||
| # Wait for the central to disconnect. | ||
| await connection.disconnected(timeout_ms=TIMEOUT_MS) | ||
| print("disconnected") | ||
|  | ||
|  | ||
| def instance0(): | ||
| try: | ||
| asyncio.run(instance0_task()) | ||
| finally: | ||
| aioble.stop() | ||
|  | ||
|  | ||
| # Acting in central role. | ||
| async def instance1_task(): | ||
| multitest.next() | ||
|  | ||
| # Clean up any existing secrets from previous tests | ||
| try: | ||
| os.remove("ble_secrets.json") | ||
| except: | ||
| pass | ||
|  | ||
| # Load secrets (will be empty initially but enables bond storage) | ||
| aioble.security.load_secrets() | ||
|  | ||
| # Connect to peripheral. | ||
| print("connect") | ||
| device = aioble.Device(*BDADDR) | ||
| connection = await device.connect(timeout_ms=TIMEOUT_MS) | ||
|  | ||
| # Discover characteristics (before pairing). | ||
| service = await connection.service(SERVICE_UUID) | ||
| print("service", service.uuid) | ||
| characteristic = await service.characteristic(CHAR_UUID) | ||
| print("characteristic", characteristic.uuid) | ||
|  | ||
| # Pair with bonding enabled. | ||
| print("bond") | ||
| await connection.pair( | ||
| bond=True, # Enable bonding | ||
| le_secure=True, | ||
| mitm=False, | ||
| timeout_ms=TIMEOUT_MS, | ||
| ) | ||
|  | ||
| # Give additional time for bonding to complete after encryption | ||
| await asyncio.sleep_ms(500) | ||
|  | ||
| print( | ||
| "bonded encrypted={} authenticated={} bonded={}".format( | ||
| 1 if connection.encrypted else 0, | ||
| 1 if connection.authenticated else 0, | ||
| 1 if connection.bonded else 0, | ||
| ) | ||
| ) | ||
|  | ||
| # Read the peripheral's characteristic, should be encrypted. | ||
| print("read_encrypted") | ||
| data = await characteristic.read(timeout_ms=TIMEOUT_MS) | ||
| print("read", data) | ||
|  | ||
| # Check if secrets were saved | ||
| try: | ||
| os.stat("ble_secrets.json") | ||
| print("secrets_exist", "yes") | ||
| except: | ||
| print("secrets_exist", "no") | ||
|  | ||
| # Disconnect from peripheral. | ||
| print("disconnect") | ||
| await connection.disconnect(timeout_ms=TIMEOUT_MS) | ||
| print("disconnected") | ||
|  | ||
|  | ||
| def instance1(): | ||
| try: | ||
| asyncio.run(instance1_task()) | ||
| finally: | ||
| aioble.stop() | 
 
 
 
 17 changes: 17 additions & 0 deletions
 
 
 
 micropython/bluetooth/aioble/multitests/ble_bond.py.exp
 
 
 
 
  
 
 
 
 
 
 
 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
 Learn more about bidirectional Unicode characters
 
 
 
 
 | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| --- instance0 --- | ||
| advertise | ||
| connected | ||
| wait_for_bonding | ||
| bonded encrypted=1 authenticated=0 bonded=1 | ||
| disconnected | ||
| --- instance1 --- | ||
| connect | ||
| service UUID('a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a') | ||
| characteristic UUID('00000000-1111-2222-3333-444444444444') | ||
| bond | ||
| bonded encrypted=1 authenticated=0 bonded=1 | ||
| read_encrypted | ||
| read b'bonded_data' | ||
| secrets_exist yes | ||
| disconnect | ||
| disconnected | 
 
 
 
 148 changes: 148 additions & 0 deletions
 
 
 
 micropython/bluetooth/aioble/multitests/ble_pair.py
 
 
 
 
  
 
 
 
 
 
 
 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
 Learn more about bidirectional Unicode characters
 
 
 
 
 | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| # Test BLE GAP pairing and bonding with aioble | ||
|  | ||
| import sys | ||
|  | ||
| # ruff: noqa: E402 | ||
| sys.path.append("") | ||
|  | ||
| from micropython import const | ||
| import machine | ||
| import time | ||
|  | ||
| import asyncio | ||
| import aioble | ||
| import bluetooth | ||
|  | ||
| TIMEOUT_MS = 5000 | ||
|  | ||
| SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A") | ||
| CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444") | ||
|  | ||
| _FLAG_READ = const(0x0002) | ||
| _FLAG_READ_ENCRYPTED = const(0x0200) | ||
|  | ||
|  | ||
| # For aioble, we need to directly use the low-level bluetooth API for encrypted characteristics | ||
| class EncryptedCharacteristic(aioble.Characteristic): | ||
| def __init__(self, service, uuid, **kwargs): | ||
| super().__init__(service, uuid, read=True, **kwargs) | ||
| # Override flags to add encryption requirement | ||
| self.flags |= _FLAG_READ_ENCRYPTED | ||
|  | ||
|  | ||
| # Acting in peripheral role. | ||
| async def instance0_task(): | ||
| # Clear any existing bond state | ||
| import os | ||
|  | ||
| try: | ||
| os.remove("ble_secrets.json") | ||
| except: | ||
| pass | ||
|  | ||
| service = aioble.Service(SERVICE_UUID) | ||
| characteristic = EncryptedCharacteristic(service, CHAR_UUID) | ||
| aioble.register_services(service) | ||
|  | ||
| multitest.globals(BDADDR=aioble.config("mac")) | ||
| multitest.next() | ||
|  | ||
| # Write initial characteristic value. | ||
| characteristic.write("encrypted_data") | ||
|  | ||
| # Wait for central to connect to us. | ||
| print("advertise") | ||
| connection = await aioble.advertise( | ||
| 20_000, adv_data=b"\x02\x01\x06\x04\xffMPY", timeout_ms=TIMEOUT_MS | ||
| ) | ||
| print("connected") | ||
|  | ||
| # Wait for pairing to complete | ||
| print("wait_for_pairing") | ||
| start_time = time.ticks_ms() | ||
| while not connection.encrypted and time.ticks_diff(time.ticks_ms(), start_time) < TIMEOUT_MS: | ||
| await asyncio.sleep_ms(100) | ||
|  | ||
| # Give a small delay for bonding state to stabilize | ||
| await asyncio.sleep_ms(200) | ||
|  | ||
| if connection.encrypted: | ||
| print( | ||
| "paired encrypted=1 authenticated={} bonded={}".format( | ||
| 1 if connection.authenticated else 0, 1 if connection.bonded else 0 | ||
| ) | ||
| ) | ||
| else: | ||
| print("pairing_timeout") | ||
|  | ||
| # Wait for the central to disconnect. | ||
| await connection.disconnected(timeout_ms=TIMEOUT_MS) | ||
| print("disconnected") | ||
|  | ||
|  | ||
| def instance0(): | ||
| try: | ||
| asyncio.run(instance0_task()) | ||
| finally: | ||
| aioble.stop() | ||
|  | ||
|  | ||
| # Acting in central role. | ||
| async def instance1_task(): | ||
| multitest.next() | ||
|  | ||
| # Clear any existing bond state | ||
| import os | ||
|  | ||
| try: | ||
| os.remove("ble_secrets.json") | ||
| except: | ||
| pass | ||
|  | ||
| # Connect to peripheral. | ||
| print("connect") | ||
| device = aioble.Device(*BDADDR) | ||
| connection = await device.connect(timeout_ms=TIMEOUT_MS) | ||
|  | ||
| # Discover characteristics (before pairing). | ||
| service = await connection.service(SERVICE_UUID) | ||
| print("service", service.uuid) | ||
| characteristic = await service.characteristic(CHAR_UUID) | ||
| print("characteristic", characteristic.uuid) | ||
|  | ||
| # Pair with the peripheral. | ||
| print("pair") | ||
| await connection.pair( | ||
| bond=False, # Don't bond for this test | ||
| le_secure=True, | ||
| mitm=False, | ||
| timeout_ms=TIMEOUT_MS, | ||
| ) | ||
|  | ||
| # Give a small delay for bonding state to stabilize | ||
| await asyncio.sleep_ms(200) | ||
|  | ||
| print( | ||
| "paired encrypted={} authenticated={} bonded={}".format( | ||
| 1 if connection.encrypted else 0, | ||
| 1 if connection.authenticated else 0, | ||
| 1 if connection.bonded else 0, | ||
| ) | ||
| ) | ||
|  | ||
| # Read the peripheral's characteristic, should be encrypted. | ||
| print("read_encrypted") | ||
| data = await characteristic.read(timeout_ms=TIMEOUT_MS) | ||
| print("read", data) | ||
|  | ||
| # Disconnect from peripheral. | ||
| print("disconnect") | ||
| await connection.disconnect(timeout_ms=TIMEOUT_MS) | ||
| print("disconnected") | ||
|  | ||
|  | ||
| def instance1(): | ||
| try: | ||
| asyncio.run(instance1_task()) | ||
| finally: | ||
| aioble.stop() | 
 
 
 
 16 changes: 16 additions & 0 deletions
 
 
 
 micropython/bluetooth/aioble/multitests/ble_pair.py.exp
 
 
 
 
  
 
 
 
 
 
 
 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
 Learn more about bidirectional Unicode characters
 
 
 
 
 | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| --- instance0 --- | ||
| advertise | ||
| connected | ||
| wait_for_pairing | ||
| paired encrypted=1 authenticated=0 bonded=0 | ||
| disconnected | ||
| --- instance1 --- | ||
| connect | ||
| service UUID('a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a') | ||
| characteristic UUID('00000000-1111-2222-3333-444444444444') | ||
| pair | ||
| paired encrypted=1 authenticated=0 bonded=0 | ||
| read_encrypted | ||
| read b'encrypted_data' | ||
| disconnect | ||
| disconnected | 
 
 Oops, something went wrong.
 
 
 
 Add this suggestion to a batch that can be applied as a single commit.
 This suggestion is invalid because no changes were made to the code.
 Suggestions cannot be applied while the pull request is closed.
 Suggestions cannot be applied while viewing a subset of changes.
 Only one suggestion per line can be applied in a batch.
 Add this suggestion to a batch that can be applied as a single commit.
 Applying suggestions on deleted lines is not supported.
 You must change the existing code in this line in order to create a valid suggestion.
 Outdated suggestions cannot be applied.
 This suggestion has been applied or marked resolved.
 Suggestions cannot be applied from pending reviews.
 Suggestions cannot be applied on multi-line comments.
 Suggestions cannot be applied while the pull request is queued to merge.
 Suggestion cannot be applied right now. Please check back later.