commit b9a5a45aff9c128ea77e37cd05deb59161db7e61 Author: scayac Date: Fri Feb 20 14:43:47 2026 +0100 Add initial implementation of duplo-controller library with CLI and BLE support - Create .gitignore to exclude environment and build artifacts - Add README.md with project description, features, installation, and usage examples - Implement duplo_controller module with DuploTrainHub, DuploColor, and DuploSound classes - Add CLI for manual testing of the DUPLO Hub - Define project metadata in pyproject.toml - Include manual test script diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6d77209 --- /dev/null +++ b/.gitignore @@ -0,0 +1,32 @@ +# Environnements virtuels +.venv/ +venv/ + +# Cache Python +__pycache__/ +*.py[cod] +*$py.class + +# Artefacts de packaging/build +build/ +dist/ +*.egg-info/ +.eggs/ + +# Outils de test / couverture +.pytest_cache/ +.coverage +.coverage.* +htmlcov/ + +# Type checking / lint cache +.mypy_cache/ +.ruff_cache/ + +# IDE / éditeur +.vscode/ +.idea/ + +# Fichiers système +.DS_Store +Thumbs.db diff --git a/README.md b/README.md new file mode 100644 index 0000000..fd4d0d0 --- /dev/null +++ b/README.md @@ -0,0 +1,111 @@ +# duplo-controller + +Librairie Python minimale pour contrôler un train LEGO DUPLO Bluetooth (Hub n°5) via le protocole LEGO Wireless. + +## Fonctionnalités + +- connexion BLE avec adresse explicite **ou découverte automatique** du hub +- avancer / reculer / stop (vitesse `-100..100`) +- jouer les sons DUPLO (`brake`, `station_departure`, `water_refill`, `horn`, `steam`) +- jouer un tone direct (`0..10`) via `play_tone(...)` +- allumer/changer la couleur de la lumière (11 couleurs) +- éteindre la lumière (`turn_off_light()`) +- script CLI interactif (`duplo-test`) pour tests manuels + +## Installation + +```bash +python -m venv .venv +source .venv/bin/activate +pip install -e . +``` + +## Utilisation rapide + +```python +import asyncio +from duplo_controller import DuploTrainHub, DuploColor, DuploSound + +async def main(): + async with DuploTrainHub() as hub: + await hub.forward(50) + await asyncio.sleep(2) + await hub.stop() + await hub.play_sound(DuploSound.HORN) + await hub.change_light_color(DuploColor.WHITE) + await hub.play_tone(4) + await asyncio.sleep(1) + await hub.turn_off_light() + +asyncio.run(main()) +``` + +## Script de test manuel + +```bash +python -m tests.manual_test +# ou après installation: +duplo-test +``` + +Options: + +- `--address XX:XX:XX:XX:XX:XX` pour forcer une adresse BLE +- `--speed 50` vitesse par défaut du menu + +Dans le menu, l'option de couleur supporte : +`black`, `pink`, `purple`, `blue`, `lightblue`, `cyan`, `green`, `yellow`, `orange`, `red`, `white`. + +Dans le menu, l'option son supporte : +`brake`, `station_departure`, `water_refill`, `horn`, `steam`. + +## API principale + +- `await hub.connect()` / `await hub.disconnect()` +- `await hub.forward(speed=50)` +- `await hub.backward(speed=50)` +- `await hub.stop()` +- `await hub.set_motor_speed(speed)` +- `await hub.change_light_color(DuploColor.RED)` +- `await hub.turn_off_light()` +- `await hub.play_sound(DuploSound.HORN)` +- `await hub.play_tone(4)` +- `await hub.demo()` + +## Intégration pygame (résumé) + +`DuploTrainHub` est asynchrone (`asyncio`). + +- Option simple : démarrer une boucle `asyncio` dans un thread dédié et y soumettre les commandes BLE depuis la boucle pygame. +- Option avancée : piloter la boucle pygame depuis une coroutine principale `asyncio`. + +Exemple minimal de pattern (thread + loop asyncio) : + +```python +import asyncio +import threading + +ble_loop = asyncio.new_event_loop() +threading.Thread(target=ble_loop.run_forever, daemon=True).start() + +hub = DuploTrainHub() +asyncio.run_coroutine_threadsafe(hub.connect(), ble_loop).result() + +# Depuis un event pygame: +# asyncio.run_coroutine_threadsafe(hub.forward(60), ble_loop) + +# À la fin: +asyncio.run_coroutine_threadsafe(hub.disconnect(), ble_loop).result() +ble_loop.call_soon_threadsafe(ble_loop.stop) +``` + +## Notes techniques + +- Service LEGO Hub: `00001623-1212-efde-1623-785feabcd123` +- Characteristic LEGO Hub: `00001624-1212-efde-1623-785feabcd123` +- Hub DUPLO détecté via manufacturer data LEGO (`0x0397`) + type `0x20` + +## Références + +- Protocole officiel: https://lego.github.io/lego-ble-wireless-protocol-docs/ +- Référence commandes (inspiration): https://github.com/corneliusmunz/legoino diff --git a/duplo_controller/__init__.py b/duplo_controller/__init__.py new file mode 100644 index 0000000..0982480 --- /dev/null +++ b/duplo_controller/__init__.py @@ -0,0 +1,3 @@ +from .hub import DuploTrainHub, DuploColor, DuploSound + +__all__ = ["DuploTrainHub", "DuploColor", "DuploSound"] diff --git a/duplo_controller/cli.py b/duplo_controller/cli.py new file mode 100644 index 0000000..05793bc --- /dev/null +++ b/duplo_controller/cli.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import argparse +import asyncio + +from .hub import DuploColor, DuploSound, DuploTrainHub + + +MENU = """ +=== Test DUPLO Hub #5 === +1) Avancer +2) Reculer +3) Stop +4) Jouer une musique +5) Changer couleur lumière +7) Éteindre la lumière +6) Demo rapide +q) Quitter +""" + +SOUND_MENU = """ +Sons disponibles: +- brake +- station_departure +- water_refill +- horn +- steam +""" + +SOUND_BY_NAME: dict[str, DuploSound] = { + "brake": DuploSound.BRAKE, + "station_departure": DuploSound.STATION_DEPARTURE, + "water_refill": DuploSound.WATER_REFILL, + "horn": DuploSound.HORN, + "steam": DuploSound.STEAM, +} + +COLOR_MENU = """ +Couleurs disponibles: +- black +- pink +- purple +- blue +- lightblue +- cyan +- green +- yellow +- orange +- red +- white +""" + +COLOR_BY_NAME: dict[str, DuploColor] = { + "black": DuploColor.BLACK, + "pink": DuploColor.PINK, + "purple": DuploColor.PURPLE, + "blue": DuploColor.BLUE, + "lightblue": DuploColor.LIGHTBLUE, + "cyan": DuploColor.CYAN, + "green": DuploColor.GREEN, + "yellow": DuploColor.YELLOW, + "orange": DuploColor.ORANGE, + "red": DuploColor.RED, + "white": DuploColor.WHITE, +} + + +async def run_test(address: str | None, speed: int) -> None: + hub = DuploTrainHub(address=address) + await hub.connect() + print("Connecté au hub DUPLO.") + + try: + while True: + print(MENU) + choice = input("Choix: ").strip().lower() + + if choice == "1": + await hub.forward(speed) + print(f"Avance à {abs(speed)}%") + elif choice == "2": + await hub.backward(speed) + print(f"Recule à {abs(speed)}%") + elif choice == "3": + await hub.stop() + print("Stop") + elif choice == "4": + print(SOUND_MENU) + sound_name = input("Son: ").strip().lower() + sound = SOUND_BY_NAME.get(sound_name) + if sound is None: + print("Son invalide") + continue + await hub.play_sound(sound) + print(f"Musique jouée ({sound_name.upper()})") + elif choice == "5": + print(COLOR_MENU) + color_name = input("Couleur: ").strip().lower() + color = COLOR_BY_NAME.get(color_name) + if color is None: + print("Couleur invalide") + continue + await hub.change_light_color(color) + print(f"Couleur lumière changée ({color_name.upper()})") + elif choice == "7": + await hub.turn_off_light() + print("Lumière éteinte") + elif choice == "6": + await hub.demo() + print("Demo terminée") + elif choice == "q": + break + else: + print("Choix invalide") + finally: + await hub.stop() + await hub.disconnect() + print("Déconnecté") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Test manuel du hub DUPLO Train #5") + parser.add_argument("--address", help="Adresse BLE du hub (optionnel)") + parser.add_argument("--speed", type=int, default=50, help="Vitesse par défaut 0..100") + args = parser.parse_args() + + asyncio.run(run_test(address=args.address, speed=max(0, min(100, args.speed)))) + + +if __name__ == "__main__": + main() diff --git a/duplo_controller/hub.py b/duplo_controller/hub.py new file mode 100644 index 0000000..702df72 --- /dev/null +++ b/duplo_controller/hub.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from enum import IntEnum +from typing import Optional + +from bleak import BleakClient, BleakScanner + + +LEGO_MANUFACTURER_ID = 0x0397 +DUPLO_HUB_ID = 0x20 + +LEGO_HUB_SERVICE_UUID = "00001623-1212-efde-1623-785feabcd123" +LEGO_HUB_CHARACTERISTIC_UUID = "00001624-1212-efde-1623-785feabcd123" + + +class DuploColor(IntEnum): + BLACK = 0 + PINK = 1 + PURPLE = 2 + BLUE = 3 + LIGHTBLUE = 4 + CYAN = 5 + GREEN = 6 + YELLOW = 7 + ORANGE = 8 + RED = 9 + WHITE = 10 + + +class DuploSound(IntEnum): + BRAKE = 3 + STATION_DEPARTURE = 5 + WATER_REFILL = 7 + HORN = 9 + STEAM = 10 + + +@dataclass +class DuploPorts: + MOTOR: int = 0x00 + SPEAKER: int = 0x01 + LED: int = 0x11 + + +class DuploTrainHub: + def __init__(self, address: Optional[str] = None, timeout: float = 10.0) -> None: + self._address = address + self._timeout = timeout + self._client: Optional[BleakClient] = None + self.ports = DuploPorts() + + @property + def is_connected(self) -> bool: + return self._client is not None and self._client.is_connected + + async def connect(self) -> None: + if self.is_connected: + return + + if not self._address: + self._address = await self._discover_duplo_hub() + + self._client = BleakClient(self._address, timeout=self._timeout) + await self._client.connect() + + async def disconnect(self) -> None: + if self._client is not None: + await self._client.disconnect() + self._client = None + + async def __aenter__(self) -> "DuploTrainHub": + await self.connect() + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.disconnect() + + async def set_motor_speed(self, speed: int) -> None: + mapped = self._clamp(speed, -100, 100) + payload = bytes([0x81, self.ports.MOTOR, 0x11, 0x51, 0x00, mapped & 0xFF]) + await self._write(payload) + + async def forward(self, speed: int = 50) -> None: + await self.set_motor_speed(abs(speed)) + + async def backward(self, speed: int = 50) -> None: + await self.set_motor_speed(-abs(speed)) + + async def stop(self) -> None: + await self.set_motor_speed(0) + + async def set_light(self, color: DuploColor) -> None: + set_color_mode = bytes([0x41, self.ports.LED, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00]) + set_color = bytes([0x81, self.ports.LED, 0x11, 0x51, 0x00, int(color) & 0xFF]) + await self._write(set_color_mode) + await asyncio.sleep(0.05) + await self._write(set_color) + + async def change_light_color(self, color: DuploColor) -> None: + await self.set_light(color) + + async def turn_off_light(self) -> None: + await self.set_light(DuploColor.BLACK) + + async def play_sound(self, sound: DuploSound) -> None: + set_sound_mode = bytes([0x41, self.ports.SPEAKER, 0x01, 0x01, 0x00, 0x00, 0x00, 0x01]) + play_sound = bytes([0x81, self.ports.SPEAKER, 0x11, 0x51, 0x01, int(sound) & 0xFF]) + await self._write(set_sound_mode) + await asyncio.sleep(0.05) + await self._write(play_sound) + + async def play_tone(self, tone_number: int) -> None: + tone = self._clamp(tone_number, 0, 10) + set_tone_mode = bytes([0x41, self.ports.SPEAKER, 0x02, 0x01, 0x00, 0x00, 0x00, 0x01]) + play_tone = bytes([0x81, self.ports.SPEAKER, 0x11, 0x51, 0x02, tone & 0xFF]) + await self._write(set_tone_mode) + await asyncio.sleep(0.05) + await self._write(play_tone) + + async def demo(self) -> None: + await self.set_light(DuploColor.GREEN) + await self.forward(40) + await asyncio.sleep(2) + await self.stop() + await self.play_sound(DuploSound.HORN) + + async def _write(self, payload: bytes) -> None: + if not self.is_connected or self._client is None: + raise RuntimeError("Hub non connecté. Appelez connect() avant.") + framed_payload = self._frame_message(payload) + await self._client.write_gatt_char(LEGO_HUB_CHARACTERISTIC_UUID, framed_payload, response=True) + + @staticmethod + def _frame_message(message_without_header: bytes) -> bytes: + message_length = len(message_without_header) + 2 + if message_length > 127: + raise ValueError("Message trop long pour l'encodage court LEGO BLE.") + return bytes([message_length, 0x00]) + message_without_header + + async def _discover_duplo_hub(self) -> str: + candidates: list[tuple[object, dict]] = [] + + try: + discovered = await BleakScanner.discover(timeout=self._timeout, return_adv=True) + for _, value in discovered.items(): + device, adv = value + candidates.append((device, getattr(adv, "manufacturer_data", {}) or {})) + except TypeError: + devices = await BleakScanner.discover(timeout=self._timeout) + for device in devices: + metadata = getattr(device, "metadata", {}) or {} + candidates.append((device, metadata.get("manufacturer_data", {}) or {})) + + for device, manufacturer_data in candidates: + manu = manufacturer_data.get(LEGO_MANUFACTURER_ID) + name = getattr(device, "name", "") or "" + address = getattr(device, "address", None) + if not address: + continue + if manu and len(manu) >= 2 and manu[1] == DUPLO_HUB_ID: + return address + if "duplo" in name.lower(): + return address + raise RuntimeError("Aucun hub DUPLO détecté. Vérifiez qu'il est allumé et en mode appairage.") + + @staticmethod + def _clamp(value: int, minimum: int, maximum: int) -> int: + return max(minimum, min(maximum, int(value))) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d70a8e6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,24 @@ +[project] +name = "duplo-controller" +version = "0.1.0" +description = "Python library to control LEGO DUPLO Train Hub (Hub #5) over BLE" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "bleak>=0.22.0", +] + +[project.scripts] +duplo-test = "duplo_controller.cli:main" + +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +package-dir = {"" = "."} + +[tool.setuptools.packages.find] +where = ["."] +include = ["duplo_controller*"] +exclude = ["tests*"] diff --git a/tests/manual_test.py b/tests/manual_test.py new file mode 100644 index 0000000..4561eb2 --- /dev/null +++ b/tests/manual_test.py @@ -0,0 +1,5 @@ +from duplo_controller.cli import main + + +if __name__ == "__main__": + main()