Browse Source

Add initial implementation of duplo-controller library with CLI and BLE support

- Create .gitignore to exclude environment and build artifacts
- Add README.md with project description, features, installation, and usage examples
- Implement duplo_controller module with DuploTrainHub, DuploColor, and DuploSound classes
- Add CLI for manual testing of the DUPLO Hub
- Define project metadata in pyproject.toml
- Include manual test script
master
scayac 4 weeks ago
commit
b9a5a45aff
  1. 32
      .gitignore
  2. 111
      README.md
  3. 3
      duplo_controller/__init__.py
  4. 131
      duplo_controller/cli.py
  5. 170
      duplo_controller/hub.py
  6. 24
      pyproject.toml
  7. 5
      tests/manual_test.py

32
.gitignore vendored

@ -0,0 +1,32 @@ @@ -0,0 +1,32 @@
# Environnements virtuels
.venv/
venv/
# Cache Python
__pycache__/
*.py[cod]
*$py.class
# Artefacts de packaging/build
build/
dist/
*.egg-info/
.eggs/
# Outils de test / couverture
.pytest_cache/
.coverage
.coverage.*
htmlcov/
# Type checking / lint cache
.mypy_cache/
.ruff_cache/
# IDE / éditeur
.vscode/
.idea/
# Fichiers système
.DS_Store
Thumbs.db

111
README.md

@ -0,0 +1,111 @@ @@ -0,0 +1,111 @@
# duplo-controller
Librairie Python minimale pour contrôler un train LEGO DUPLO Bluetooth (Hub n°5) via le protocole LEGO Wireless.
## Fonctionnalités
- connexion BLE avec adresse explicite **ou découverte automatique** du hub
- avancer / reculer / stop (vitesse `-100..100`)
- jouer les sons DUPLO (`brake`, `station_departure`, `water_refill`, `horn`, `steam`)
- jouer un tone direct (`0..10`) via `play_tone(...)`
- allumer/changer la couleur de la lumière (11 couleurs)
- éteindre la lumière (`turn_off_light()`)
- script CLI interactif (`duplo-test`) pour tests manuels
## Installation
```bash
python -m venv .venv
source .venv/bin/activate
pip install -e .
```
## Utilisation rapide
```python
import asyncio
from duplo_controller import DuploTrainHub, DuploColor, DuploSound
async def main():
async with DuploTrainHub() as hub:
await hub.forward(50)
await asyncio.sleep(2)
await hub.stop()
await hub.play_sound(DuploSound.HORN)
await hub.change_light_color(DuploColor.WHITE)
await hub.play_tone(4)
await asyncio.sleep(1)
await hub.turn_off_light()
asyncio.run(main())
```
## Script de test manuel
```bash
python -m tests.manual_test
# ou après installation:
duplo-test
```
Options:
- `--address XX:XX:XX:XX:XX:XX` pour forcer une adresse BLE
- `--speed 50` vitesse par défaut du menu
Dans le menu, l'option de couleur supporte :
`black`, `pink`, `purple`, `blue`, `lightblue`, `cyan`, `green`, `yellow`, `orange`, `red`, `white`.
Dans le menu, l'option son supporte :
`brake`, `station_departure`, `water_refill`, `horn`, `steam`.
## API principale
- `await hub.connect()` / `await hub.disconnect()`
- `await hub.forward(speed=50)`
- `await hub.backward(speed=50)`
- `await hub.stop()`
- `await hub.set_motor_speed(speed)`
- `await hub.change_light_color(DuploColor.RED)`
- `await hub.turn_off_light()`
- `await hub.play_sound(DuploSound.HORN)`
- `await hub.play_tone(4)`
- `await hub.demo()`
## Intégration pygame (résumé)
`DuploTrainHub` est asynchrone (`asyncio`).
- Option simple : démarrer une boucle `asyncio` dans un thread dédié et y soumettre les commandes BLE depuis la boucle pygame.
- Option avancée : piloter la boucle pygame depuis une coroutine principale `asyncio`.
Exemple minimal de pattern (thread + loop asyncio) :
```python
import asyncio
import threading
ble_loop = asyncio.new_event_loop()
threading.Thread(target=ble_loop.run_forever, daemon=True).start()
hub = DuploTrainHub()
asyncio.run_coroutine_threadsafe(hub.connect(), ble_loop).result()
# Depuis un event pygame:
# asyncio.run_coroutine_threadsafe(hub.forward(60), ble_loop)
# À la fin:
asyncio.run_coroutine_threadsafe(hub.disconnect(), ble_loop).result()
ble_loop.call_soon_threadsafe(ble_loop.stop)
```
## Notes techniques
- Service LEGO Hub: `00001623-1212-efde-1623-785feabcd123`
- Characteristic LEGO Hub: `00001624-1212-efde-1623-785feabcd123`
- Hub DUPLO détecté via manufacturer data LEGO (`0x0397`) + type `0x20`
## Références
- Protocole officiel: https://lego.github.io/lego-ble-wireless-protocol-docs/
- Référence commandes (inspiration): https://github.com/corneliusmunz/legoino

3
duplo_controller/__init__.py

@ -0,0 +1,3 @@ @@ -0,0 +1,3 @@
from .hub import DuploTrainHub, DuploColor, DuploSound
__all__ = ["DuploTrainHub", "DuploColor", "DuploSound"]

131
duplo_controller/cli.py

@ -0,0 +1,131 @@ @@ -0,0 +1,131 @@
from __future__ import annotations
import argparse
import asyncio
from .hub import DuploColor, DuploSound, DuploTrainHub
MENU = """
=== Test DUPLO Hub #5 ===
1) Avancer
2) Reculer
3) Stop
4) Jouer une musique
5) Changer couleur lumière
7) Éteindre la lumière
6) Demo rapide
q) Quitter
"""
SOUND_MENU = """
Sons disponibles:
- brake
- station_departure
- water_refill
- horn
- steam
"""
SOUND_BY_NAME: dict[str, DuploSound] = {
"brake": DuploSound.BRAKE,
"station_departure": DuploSound.STATION_DEPARTURE,
"water_refill": DuploSound.WATER_REFILL,
"horn": DuploSound.HORN,
"steam": DuploSound.STEAM,
}
COLOR_MENU = """
Couleurs disponibles:
- black
- pink
- purple
- blue
- lightblue
- cyan
- green
- yellow
- orange
- red
- white
"""
COLOR_BY_NAME: dict[str, DuploColor] = {
"black": DuploColor.BLACK,
"pink": DuploColor.PINK,
"purple": DuploColor.PURPLE,
"blue": DuploColor.BLUE,
"lightblue": DuploColor.LIGHTBLUE,
"cyan": DuploColor.CYAN,
"green": DuploColor.GREEN,
"yellow": DuploColor.YELLOW,
"orange": DuploColor.ORANGE,
"red": DuploColor.RED,
"white": DuploColor.WHITE,
}
async def run_test(address: str | None, speed: int) -> None:
hub = DuploTrainHub(address=address)
await hub.connect()
print("Connecté au hub DUPLO.")
try:
while True:
print(MENU)
choice = input("Choix: ").strip().lower()
if choice == "1":
await hub.forward(speed)
print(f"Avance à {abs(speed)}%")
elif choice == "2":
await hub.backward(speed)
print(f"Recule à {abs(speed)}%")
elif choice == "3":
await hub.stop()
print("Stop")
elif choice == "4":
print(SOUND_MENU)
sound_name = input("Son: ").strip().lower()
sound = SOUND_BY_NAME.get(sound_name)
if sound is None:
print("Son invalide")
continue
await hub.play_sound(sound)
print(f"Musique jouée ({sound_name.upper()})")
elif choice == "5":
print(COLOR_MENU)
color_name = input("Couleur: ").strip().lower()
color = COLOR_BY_NAME.get(color_name)
if color is None:
print("Couleur invalide")
continue
await hub.change_light_color(color)
print(f"Couleur lumière changée ({color_name.upper()})")
elif choice == "7":
await hub.turn_off_light()
print("Lumière éteinte")
elif choice == "6":
await hub.demo()
print("Demo terminée")
elif choice == "q":
break
else:
print("Choix invalide")
finally:
await hub.stop()
await hub.disconnect()
print("Déconnecté")
def main() -> None:
parser = argparse.ArgumentParser(description="Test manuel du hub DUPLO Train #5")
parser.add_argument("--address", help="Adresse BLE du hub (optionnel)")
parser.add_argument("--speed", type=int, default=50, help="Vitesse par défaut 0..100")
args = parser.parse_args()
asyncio.run(run_test(address=args.address, speed=max(0, min(100, args.speed))))
if __name__ == "__main__":
main()

170
duplo_controller/hub.py

@ -0,0 +1,170 @@ @@ -0,0 +1,170 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional
from bleak import BleakClient, BleakScanner
LEGO_MANUFACTURER_ID = 0x0397
DUPLO_HUB_ID = 0x20
LEGO_HUB_SERVICE_UUID = "00001623-1212-efde-1623-785feabcd123"
LEGO_HUB_CHARACTERISTIC_UUID = "00001624-1212-efde-1623-785feabcd123"
class DuploColor(IntEnum):
BLACK = 0
PINK = 1
PURPLE = 2
BLUE = 3
LIGHTBLUE = 4
CYAN = 5
GREEN = 6
YELLOW = 7
ORANGE = 8
RED = 9
WHITE = 10
class DuploSound(IntEnum):
BRAKE = 3
STATION_DEPARTURE = 5
WATER_REFILL = 7
HORN = 9
STEAM = 10
@dataclass
class DuploPorts:
MOTOR: int = 0x00
SPEAKER: int = 0x01
LED: int = 0x11
class DuploTrainHub:
def __init__(self, address: Optional[str] = None, timeout: float = 10.0) -> None:
self._address = address
self._timeout = timeout
self._client: Optional[BleakClient] = None
self.ports = DuploPorts()
@property
def is_connected(self) -> bool:
return self._client is not None and self._client.is_connected
async def connect(self) -> None:
if self.is_connected:
return
if not self._address:
self._address = await self._discover_duplo_hub()
self._client = BleakClient(self._address, timeout=self._timeout)
await self._client.connect()
async def disconnect(self) -> None:
if self._client is not None:
await self._client.disconnect()
self._client = None
async def __aenter__(self) -> "DuploTrainHub":
await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self.disconnect()
async def set_motor_speed(self, speed: int) -> None:
mapped = self._clamp(speed, -100, 100)
payload = bytes([0x81, self.ports.MOTOR, 0x11, 0x51, 0x00, mapped & 0xFF])
await self._write(payload)
async def forward(self, speed: int = 50) -> None:
await self.set_motor_speed(abs(speed))
async def backward(self, speed: int = 50) -> None:
await self.set_motor_speed(-abs(speed))
async def stop(self) -> None:
await self.set_motor_speed(0)
async def set_light(self, color: DuploColor) -> None:
set_color_mode = bytes([0x41, self.ports.LED, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00])
set_color = bytes([0x81, self.ports.LED, 0x11, 0x51, 0x00, int(color) & 0xFF])
await self._write(set_color_mode)
await asyncio.sleep(0.05)
await self._write(set_color)
async def change_light_color(self, color: DuploColor) -> None:
await self.set_light(color)
async def turn_off_light(self) -> None:
await self.set_light(DuploColor.BLACK)
async def play_sound(self, sound: DuploSound) -> None:
set_sound_mode = bytes([0x41, self.ports.SPEAKER, 0x01, 0x01, 0x00, 0x00, 0x00, 0x01])
play_sound = bytes([0x81, self.ports.SPEAKER, 0x11, 0x51, 0x01, int(sound) & 0xFF])
await self._write(set_sound_mode)
await asyncio.sleep(0.05)
await self._write(play_sound)
async def play_tone(self, tone_number: int) -> None:
tone = self._clamp(tone_number, 0, 10)
set_tone_mode = bytes([0x41, self.ports.SPEAKER, 0x02, 0x01, 0x00, 0x00, 0x00, 0x01])
play_tone = bytes([0x81, self.ports.SPEAKER, 0x11, 0x51, 0x02, tone & 0xFF])
await self._write(set_tone_mode)
await asyncio.sleep(0.05)
await self._write(play_tone)
async def demo(self) -> None:
await self.set_light(DuploColor.GREEN)
await self.forward(40)
await asyncio.sleep(2)
await self.stop()
await self.play_sound(DuploSound.HORN)
async def _write(self, payload: bytes) -> None:
if not self.is_connected or self._client is None:
raise RuntimeError("Hub non connecté. Appelez connect() avant.")
framed_payload = self._frame_message(payload)
await self._client.write_gatt_char(LEGO_HUB_CHARACTERISTIC_UUID, framed_payload, response=True)
@staticmethod
def _frame_message(message_without_header: bytes) -> bytes:
message_length = len(message_without_header) + 2
if message_length > 127:
raise ValueError("Message trop long pour l'encodage court LEGO BLE.")
return bytes([message_length, 0x00]) + message_without_header
async def _discover_duplo_hub(self) -> str:
candidates: list[tuple[object, dict]] = []
try:
discovered = await BleakScanner.discover(timeout=self._timeout, return_adv=True)
for _, value in discovered.items():
device, adv = value
candidates.append((device, getattr(adv, "manufacturer_data", {}) or {}))
except TypeError:
devices = await BleakScanner.discover(timeout=self._timeout)
for device in devices:
metadata = getattr(device, "metadata", {}) or {}
candidates.append((device, metadata.get("manufacturer_data", {}) or {}))
for device, manufacturer_data in candidates:
manu = manufacturer_data.get(LEGO_MANUFACTURER_ID)
name = getattr(device, "name", "") or ""
address = getattr(device, "address", None)
if not address:
continue
if manu and len(manu) >= 2 and manu[1] == DUPLO_HUB_ID:
return address
if "duplo" in name.lower():
return address
raise RuntimeError("Aucun hub DUPLO détecté. Vérifiez qu'il est allumé et en mode appairage.")
@staticmethod
def _clamp(value: int, minimum: int, maximum: int) -> int:
return max(minimum, min(maximum, int(value)))

24
pyproject.toml

@ -0,0 +1,24 @@ @@ -0,0 +1,24 @@
[project]
name = "duplo-controller"
version = "0.1.0"
description = "Python library to control LEGO DUPLO Train Hub (Hub #5) over BLE"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"bleak>=0.22.0",
]
[project.scripts]
duplo-test = "duplo_controller.cli:main"
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
package-dir = {"" = "."}
[tool.setuptools.packages.find]
where = ["."]
include = ["duplo_controller*"]
exclude = ["tests*"]

5
tests/manual_test.py

@ -0,0 +1,5 @@ @@ -0,0 +1,5 @@
from duplo_controller.cli import main
if __name__ == "__main__":
main()
Loading…
Cancel
Save