You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

170 lines
5.7 KiB

from __future__ import annotations
import asyncio
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional
from bleak import BleakClient, BleakScanner
LEGO_MANUFACTURER_ID = 0x0397
DUPLO_HUB_ID = 0x20
LEGO_HUB_SERVICE_UUID = "00001623-1212-efde-1623-785feabcd123"
LEGO_HUB_CHARACTERISTIC_UUID = "00001624-1212-efde-1623-785feabcd123"
class DuploColor(IntEnum):
BLACK = 0
PINK = 1
PURPLE = 2
BLUE = 3
LIGHTBLUE = 4
CYAN = 5
GREEN = 6
YELLOW = 7
ORANGE = 8
RED = 9
WHITE = 10
class DuploSound(IntEnum):
BRAKE = 3
STATION_DEPARTURE = 5
WATER_REFILL = 7
HORN = 9
STEAM = 10
@dataclass
class DuploPorts:
MOTOR: int = 0x00
SPEAKER: int = 0x01
LED: int = 0x11
class DuploTrainHub:
def __init__(self, address: Optional[str] = None, timeout: float = 10.0) -> None:
self._address = address
self._timeout = timeout
self._client: Optional[BleakClient] = None
self.ports = DuploPorts()
@property
def is_connected(self) -> bool:
return self._client is not None and self._client.is_connected
async def connect(self) -> None:
if self.is_connected:
return
if not self._address:
self._address = await self._discover_duplo_hub()
self._client = BleakClient(self._address, timeout=self._timeout)
await self._client.connect()
async def disconnect(self) -> None:
if self._client is not None:
await self._client.disconnect()
self._client = None
async def __aenter__(self) -> "DuploTrainHub":
await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self.disconnect()
async def set_motor_speed(self, speed: int) -> None:
mapped = self._clamp(speed, -100, 100)
payload = bytes([0x81, self.ports.MOTOR, 0x11, 0x51, 0x00, mapped & 0xFF])
await self._write(payload)
async def forward(self, speed: int = 50) -> None:
await self.set_motor_speed(abs(speed))
async def backward(self, speed: int = 50) -> None:
await self.set_motor_speed(-abs(speed))
async def stop(self) -> None:
await self.set_motor_speed(0)
async def set_light(self, color: DuploColor) -> None:
set_color_mode = bytes([0x41, self.ports.LED, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00])
set_color = bytes([0x81, self.ports.LED, 0x11, 0x51, 0x00, int(color) & 0xFF])
await self._write(set_color_mode)
await asyncio.sleep(0.05)
await self._write(set_color)
async def change_light_color(self, color: DuploColor) -> None:
await self.set_light(color)
async def turn_off_light(self) -> None:
await self.set_light(DuploColor.BLACK)
async def play_sound(self, sound: DuploSound) -> None:
set_sound_mode = bytes([0x41, self.ports.SPEAKER, 0x01, 0x01, 0x00, 0x00, 0x00, 0x01])
play_sound = bytes([0x81, self.ports.SPEAKER, 0x11, 0x51, 0x01, int(sound) & 0xFF])
await self._write(set_sound_mode)
await asyncio.sleep(0.05)
await self._write(play_sound)
async def play_tone(self, tone_number: int) -> None:
tone = self._clamp(tone_number, 0, 10)
set_tone_mode = bytes([0x41, self.ports.SPEAKER, 0x02, 0x01, 0x00, 0x00, 0x00, 0x01])
play_tone = bytes([0x81, self.ports.SPEAKER, 0x11, 0x51, 0x02, tone & 0xFF])
await self._write(set_tone_mode)
await asyncio.sleep(0.05)
await self._write(play_tone)
async def demo(self) -> None:
await self.set_light(DuploColor.GREEN)
await self.forward(40)
await asyncio.sleep(2)
await self.stop()
await self.play_sound(DuploSound.HORN)
async def _write(self, payload: bytes) -> None:
if not self.is_connected or self._client is None:
raise RuntimeError("Hub non connecté. Appelez connect() avant.")
framed_payload = self._frame_message(payload)
await self._client.write_gatt_char(LEGO_HUB_CHARACTERISTIC_UUID, framed_payload, response=True)
@staticmethod
def _frame_message(message_without_header: bytes) -> bytes:
message_length = len(message_without_header) + 2
if message_length > 127:
raise ValueError("Message trop long pour l'encodage court LEGO BLE.")
return bytes([message_length, 0x00]) + message_without_header
async def _discover_duplo_hub(self) -> str:
candidates: list[tuple[object, dict]] = []
try:
discovered = await BleakScanner.discover(timeout=self._timeout, return_adv=True)
for _, value in discovered.items():
device, adv = value
candidates.append((device, getattr(adv, "manufacturer_data", {}) or {}))
except TypeError:
devices = await BleakScanner.discover(timeout=self._timeout)
for device in devices:
metadata = getattr(device, "metadata", {}) or {}
candidates.append((device, metadata.get("manufacturer_data", {}) or {}))
for device, manufacturer_data in candidates:
manu = manufacturer_data.get(LEGO_MANUFACTURER_ID)
name = getattr(device, "name", "") or ""
address = getattr(device, "address", None)
if not address:
continue
if manu and len(manu) >= 2 and manu[1] == DUPLO_HUB_ID:
return address
if "duplo" in name.lower():
return address
raise RuntimeError("Aucun hub DUPLO détecté. Vérifiez qu'il est allumé et en mode appairage.")
@staticmethod
def _clamp(value: int, minimum: int, maximum: int) -> int:
return max(minimum, min(maximum, int(value)))