from __future__ import annotations import asyncio from dataclasses import dataclass from enum import IntEnum from typing import Optional from bleak import BleakClient, BleakScanner LEGO_MANUFACTURER_ID = 0x0397 DUPLO_HUB_ID = 0x20 LEGO_HUB_SERVICE_UUID = "00001623-1212-efde-1623-785feabcd123" LEGO_HUB_CHARACTERISTIC_UUID = "00001624-1212-efde-1623-785feabcd123" class DuploColor(IntEnum): BLACK = 0 PINK = 1 PURPLE = 2 BLUE = 3 LIGHTBLUE = 4 CYAN = 5 GREEN = 6 YELLOW = 7 ORANGE = 8 RED = 9 WHITE = 10 class DuploSound(IntEnum): BRAKE = 3 STATION_DEPARTURE = 5 WATER_REFILL = 7 HORN = 9 STEAM = 10 @dataclass class DuploPorts: MOTOR: int = 0x00 SPEAKER: int = 0x01 LED: int = 0x11 class DuploTrainHub: def __init__(self, address: Optional[str] = None, timeout: float = 10.0) -> None: self._address = address self._timeout = timeout self._client: Optional[BleakClient] = None self.ports = DuploPorts() @property def is_connected(self) -> bool: return self._client is not None and self._client.is_connected async def connect(self) -> None: if self.is_connected: return if not self._address: self._address = await self._discover_duplo_hub() self._client = BleakClient(self._address, timeout=self._timeout) await self._client.connect() async def disconnect(self) -> None: if self._client is not None: await self._client.disconnect() self._client = None async def __aenter__(self) -> "DuploTrainHub": await self.connect() return self async def __aexit__(self, exc_type, exc, tb) -> None: await self.disconnect() async def set_motor_speed(self, speed: int) -> None: mapped = self._clamp(speed, -100, 100) payload = bytes([0x81, self.ports.MOTOR, 0x11, 0x51, 0x00, mapped & 0xFF]) await self._write(payload) async def forward(self, speed: int = 50) -> None: await self.set_motor_speed(abs(speed)) async def backward(self, speed: int = 50) -> None: await self.set_motor_speed(-abs(speed)) async def stop(self) -> None: await self.set_motor_speed(0) async def set_light(self, color: DuploColor) -> None: set_color_mode = bytes([0x41, self.ports.LED, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00]) set_color = bytes([0x81, self.ports.LED, 0x11, 0x51, 0x00, int(color) & 0xFF]) await self._write(set_color_mode) await asyncio.sleep(0.05) await self._write(set_color) async def change_light_color(self, color: DuploColor) -> None: await self.set_light(color) async def turn_off_light(self) -> None: await self.set_light(DuploColor.BLACK) async def play_sound(self, sound: DuploSound) -> None: set_sound_mode = bytes([0x41, self.ports.SPEAKER, 0x01, 0x01, 0x00, 0x00, 0x00, 0x01]) play_sound = bytes([0x81, self.ports.SPEAKER, 0x11, 0x51, 0x01, int(sound) & 0xFF]) await self._write(set_sound_mode) await asyncio.sleep(0.05) await self._write(play_sound) async def play_tone(self, tone_number: int) -> None: tone = self._clamp(tone_number, 0, 10) set_tone_mode = bytes([0x41, self.ports.SPEAKER, 0x02, 0x01, 0x00, 0x00, 0x00, 0x01]) play_tone = bytes([0x81, self.ports.SPEAKER, 0x11, 0x51, 0x02, tone & 0xFF]) await self._write(set_tone_mode) await asyncio.sleep(0.05) await self._write(play_tone) async def demo(self) -> None: await self.set_light(DuploColor.GREEN) await self.forward(40) await asyncio.sleep(2) await self.stop() await self.play_sound(DuploSound.HORN) async def _write(self, payload: bytes) -> None: if not self.is_connected or self._client is None: raise RuntimeError("Hub non connecté. Appelez connect() avant.") framed_payload = self._frame_message(payload) await self._client.write_gatt_char(LEGO_HUB_CHARACTERISTIC_UUID, framed_payload, response=True) @staticmethod def _frame_message(message_without_header: bytes) -> bytes: message_length = len(message_without_header) + 2 if message_length > 127: raise ValueError("Message trop long pour l'encodage court LEGO BLE.") return bytes([message_length, 0x00]) + message_without_header async def _discover_duplo_hub(self) -> str: candidates: list[tuple[object, dict]] = [] try: discovered = await BleakScanner.discover(timeout=self._timeout, return_adv=True) for _, value in discovered.items(): device, adv = value candidates.append((device, getattr(adv, "manufacturer_data", {}) or {})) except TypeError: devices = await BleakScanner.discover(timeout=self._timeout) for device in devices: metadata = getattr(device, "metadata", {}) or {} candidates.append((device, metadata.get("manufacturer_data", {}) or {})) for device, manufacturer_data in candidates: manu = manufacturer_data.get(LEGO_MANUFACTURER_ID) name = getattr(device, "name", "") or "" address = getattr(device, "address", None) if not address: continue if manu and len(manu) >= 2 and manu[1] == DUPLO_HUB_ID: return address if "duplo" in name.lower(): return address raise RuntimeError("Aucun hub DUPLO détecté. Vérifiez qu'il est allumé et en mode appairage.") @staticmethod def _clamp(value: int, minimum: int, maximum: int) -> int: return max(minimum, min(maximum, int(value)))