You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

1240 lines
47 KiB

import asyncio
import json
import threading
import time
from dataclasses import dataclass
from enum import Enum, auto
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pygame
try:
from duplo_controller import DuploColor, DuploSound, DuploTrainHub
HAS_DUPLO_LIB = True
except Exception:
HAS_DUPLO_LIB = False
DuploTrainHub = None
DuploColor = None
DuploSound = None
try:
from bleak import BleakScanner
HAS_BLEAK = True
except Exception:
HAS_BLEAK = False
BleakScanner = None
WIDTH, HEIGHT = 1280, 720
FPS = 60
SPLASH_DURATION = 2.5
BG = (14, 18, 26)
PANEL_BG = (25, 33, 46)
PANEL_BORDER = (59, 76, 103)
TXT = (235, 240, 245)
GOOD = (86, 209, 124)
WARN = (232, 188, 79)
BAD = (225, 98, 98)
class GameState(Enum):
SPLASH = auto()
MENU = auto()
CONTROL = auto()
JOY_CONFIG = auto()
KEY_CONFIG = auto()
class AsyncLoopThread:
def __init__(self):
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def _run(self):
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
def run(self, coro):
return asyncio.run_coroutine_threadsafe(coro, self._loop)
def stop(self):
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join(timeout=1.0)
@dataclass
class PlayerBindings:
up: List[int]
down: List[int]
left: List[int]
right: List[int]
actions: List[int]
start: List[int]
select: List[int]
@dataclass
class PlayerInputState:
up: bool = False
down: bool = False
left: bool = False
left_pressed: bool = False
right: bool = False
right_pressed: bool = False
actions_down: List[bool] = None
actions_pressed: List[bool] = None
start_down: bool = False
start_pressed: bool = False
select_down: bool = False
select_pressed: bool = False
def __post_init__(self):
if self.actions_down is None:
self.actions_down = [False] * 6
if self.actions_pressed is None:
self.actions_pressed = [False] * 6
class InputManager:
def __init__(self):
self.bindings = {
0: PlayerBindings(
up=[pygame.K_UP],
down=[pygame.K_DOWN],
left=[pygame.K_LEFT],
right=[pygame.K_RIGHT],
actions=[pygame.K_u, pygame.K_i, pygame.K_o, pygame.K_j, pygame.K_k, pygame.K_l],
start=[pygame.K_RSHIFT],
select=[pygame.K_RCTRL],
),
1: PlayerBindings(
up=[pygame.K_z],
down=[pygame.K_s],
left=[pygame.K_q],
right=[pygame.K_d],
actions=[pygame.K_r, pygame.K_t, pygame.K_y, pygame.K_f, pygame.K_g, pygame.K_h],
start=[pygame.K_RETURN],
select=[pygame.K_BACKSPACE],
)
,
}
self.default_bindings = {
0: PlayerBindings(
up=[pygame.K_UP],
down=[pygame.K_DOWN],
left=[pygame.K_LEFT],
right=[pygame.K_RIGHT],
actions=[pygame.K_u, pygame.K_i, pygame.K_o, pygame.K_j, pygame.K_k, pygame.K_l],
start=[pygame.K_RSHIFT],
select=[pygame.K_RCTRL],
),
1: PlayerBindings(
up=[pygame.K_z],
down=[pygame.K_s],
left=[pygame.K_q],
right=[pygame.K_d],
actions=[pygame.K_r, pygame.K_t, pygame.K_y, pygame.K_f, pygame.K_g, pygame.K_h],
start=[pygame.K_RETURN],
select=[pygame.K_BACKSPACE],
)
}
self.prev: Dict[int, PlayerInputState] = {0: PlayerInputState(), 1: PlayerInputState()}
self.states: Dict[int, PlayerInputState] = {0: PlayerInputState(), 1: PlayerInputState()}
self.joy_bindings = {
0: {"actions": [0, 1, 2, 3, 4, 5], "select": 6, "start": 7, "up": -1, "down": -1, "left": -1, "right": -1},
1: {"actions": [0, 1, 2, 3, 4, 5], "select": 6, "start": 7, "up": -1, "down": -1, "left": -1, "right": -1},
}
pygame.joystick.init()
self.joysticks = []
self.refresh_joysticks()
def refresh_joysticks(self):
self.joysticks = []
for idx in range(pygame.joystick.get_count()):
joy = pygame.joystick.Joystick(idx)
joy.init()
self.joysticks.append(joy)
def _joy_for_player(self, player_idx: int):
return self.joysticks[player_idx] if player_idx < len(self.joysticks) else None
def player_for_instance_id(self, instance_id: int) -> Optional[int]:
for player, joy in enumerate(self.joysticks):
if joy.get_instance_id() == instance_id:
return player
return None
def set_joy_binding(self, player: int, slot: str, button: int):
if player not in self.joy_bindings:
return
if slot.startswith("A"):
idx = int(slot[1:]) - 1
if 0 <= idx < 6:
self.joy_bindings[player]["actions"][idx] = button
elif slot == "START":
self.joy_bindings[player]["start"] = button
elif slot == "SELECT":
self.joy_bindings[player]["select"] = button
elif slot in ("UP", "DOWN", "LEFT", "RIGHT"):
self.joy_bindings[player][slot.lower()] = button
def reset_joy_bindings(self, player: int):
self.joy_bindings[player] = {
"actions": [0, 1, 2, 3, 4, 5],
"select": 6,
"start": 7,
"up": -1,
"down": -1,
"left": -1,
"right": -1,
}
def get_joy_bindings(self, player: int):
return self.joy_bindings[player]
def set_key_binding(self, player: int, slot: str, key: int):
if player not in self.bindings:
return
b = self.bindings[player]
if slot == "UP":
b.up = [key]
elif slot == "DOWN":
b.down = [key]
elif slot == "LEFT":
b.left = [key]
elif slot == "RIGHT":
b.right = [key]
elif slot.startswith("A"):
idx = int(slot[1:]) - 1
if 0 <= idx < 6:
b.actions[idx] = key
elif slot == "START":
b.start = [key]
elif slot == "SELECT":
b.select = [key]
def reset_key_bindings(self, player: int):
d = self.default_bindings[player]
self.bindings[player] = PlayerBindings(
up=list(d.up),
down=list(d.down),
left=list(d.left),
right=list(d.right),
actions=list(d.actions),
start=list(d.start),
select=list(d.select),
)
def get_key_binding_rows(self, player: int):
b = self.bindings[player]
return [
("UP", b.up[0]),
("DOWN", b.down[0]),
("LEFT", b.left[0]),
("RIGHT", b.right[0]),
("A1", b.actions[0]),
("A2", b.actions[1]),
("A3", b.actions[2]),
("A4", b.actions[3]),
("A5", b.actions[4]),
("A6", b.actions[5]),
("SELECT", b.select[0]),
("START", b.start[0]),
]
def export_joy_bindings(self):
return {
"player1": self.joy_bindings[0],
"player2": self.joy_bindings[1],
}
def export_key_bindings(self):
out = {}
for player, key in [(0, "player1"), (1, "player2")]:
b = self.bindings[player]
out[key] = {
"up": b.up[0],
"down": b.down[0],
"left": b.left[0],
"right": b.right[0],
"actions": list(b.actions),
"start": b.start[0],
"select": b.select[0],
}
return out
def import_joy_bindings(self, data: dict):
for player, key in [(0, "player1"), (1, "player2")]:
raw = data.get(key)
if not isinstance(raw, dict):
continue
actions = raw.get("actions", [])
if not isinstance(actions, list) or len(actions) != 6:
continue
normalized_actions = []
valid = True
for b in actions:
if not isinstance(b, int) or b < 0:
valid = False
break
normalized_actions.append(b)
if not valid:
continue
start = raw.get("start")
select = raw.get("select")
if not isinstance(start, int) or start < 0:
continue
if not isinstance(select, int) or select < 0:
continue
up_btn = raw.get("up", -1)
down_btn = raw.get("down", -1)
left_btn = raw.get("left", -1)
right_btn = raw.get("right", -1)
directions_are_valid = True
for v in [up_btn, down_btn, left_btn, right_btn]:
if not isinstance(v, int) or v < -1:
directions_are_valid = False
break
if not directions_are_valid:
continue
self.joy_bindings[player] = {
"actions": normalized_actions,
"start": start,
"select": select,
"up": up_btn,
"down": down_btn,
"left": left_btn,
"right": right_btn,
}
def import_key_bindings(self, data: dict):
for player, key in [(0, "player1"), (1, "player2")]:
raw = data.get(key)
if not isinstance(raw, dict):
continue
actions = raw.get("actions")
if not isinstance(actions, list) or len(actions) != 6:
continue
required = ["up", "down", "left", "right", "start", "select"]
if any(not isinstance(raw.get(name), int) or raw.get(name) < 0 for name in required):
continue
if any(not isinstance(a, int) or a < 0 for a in actions):
continue
self.bindings[player] = PlayerBindings(
up=[raw["up"]],
down=[raw["down"]],
left=[raw["left"]],
right=[raw["right"]],
actions=list(actions),
start=[raw["start"]],
select=[raw["select"]],
)
@staticmethod
def _key_any_pressed(keys, keycodes):
return any(keys[k] for k in keycodes)
def update(self):
keys = pygame.key.get_pressed()
self.prev = {p: self.states[p] for p in self.states}
self.states = {0: PlayerInputState(), 1: PlayerInputState()}
for player in [0, 1]:
b = self.bindings[player]
s = self.states[player]
s.up = self._key_any_pressed(keys, b.up)
s.down = self._key_any_pressed(keys, b.down)
s.left = self._key_any_pressed(keys, b.left)
s.right = self._key_any_pressed(keys, b.right)
s.actions_down = [self._key_any_pressed(keys, [k]) for k in b.actions]
s.start_down = self._key_any_pressed(keys, b.start)
s.select_down = self._key_any_pressed(keys, b.select)
joy = self._joy_for_player(player)
if joy is not None:
joy_map = self.joy_bindings[player]
try:
hat_x, hat_y = joy.get_hat(0)
except Exception:
hat_x, hat_y = 0, 0
axis_x = joy.get_axis(0) if joy.get_numaxes() > 0 else 0
axis_y = joy.get_axis(1) if joy.get_numaxes() > 1 else 0
s.up = s.up or hat_y > 0 or axis_y < -0.5
s.down = s.down or hat_y < 0 or axis_y > 0.5
s.left = s.left or hat_x < 0 or axis_x < -0.5
s.right = s.right or hat_x > 0 or axis_x > 0.5
up_btn = joy_map.get("up", -1)
down_btn = joy_map.get("down", -1)
left_btn = joy_map.get("left", -1)
right_btn = joy_map.get("right", -1)
if up_btn >= 0 and up_btn < joy.get_numbuttons() and joy.get_button(up_btn):
s.up = True
if down_btn >= 0 and down_btn < joy.get_numbuttons() and joy.get_button(down_btn):
s.down = True
if left_btn >= 0 and left_btn < joy.get_numbuttons() and joy.get_button(left_btn):
s.left = True
if right_btn >= 0 and right_btn < joy.get_numbuttons() and joy.get_button(right_btn):
s.right = True
for i, btn in enumerate(joy_map["actions"]):
if btn < joy.get_numbuttons() and joy.get_button(btn):
s.actions_down[i] = True
start_btn = joy_map["start"]
select_btn = joy_map["select"]
if start_btn < joy.get_numbuttons() and joy.get_button(start_btn):
s.start_down = True
if select_btn < joy.get_numbuttons() and joy.get_button(select_btn):
s.select_down = True
prev = self.prev[player]
s.left_pressed = s.left and not prev.left
s.right_pressed = s.right and not prev.right
s.actions_pressed = [now and not prev.actions_down[i] for i, now in enumerate(s.actions_down)]
s.start_pressed = s.start_down and not prev.start_down
s.select_pressed = s.select_down and not prev.select_down
def state(self, player_idx: int) -> PlayerInputState:
return self.states[player_idx]
class TrainProxy:
def __init__(self, loop_thread: AsyncLoopThread, address: Optional[str] = None):
self.loop_thread = loop_thread
self.address = address
self.connected = False
self.last_speed = 0
self.last_sent = 0.0
self.pending_speed: Optional[int] = None
self.min_command_interval = 0.12
self._hub = DuploTrainHub(address=address) if HAS_DUPLO_LIB else None
def set_address(self, address: Optional[str]):
self.address = address
if HAS_DUPLO_LIB:
self._hub = DuploTrainHub(address=address)
def _submit(self, coro):
if not HAS_DUPLO_LIB:
return None
return self.loop_thread.run(coro)
def connect(self, timeout=10.0) -> Tuple[bool, str]:
if self.connected:
return True, "déjà connecté"
if not HAS_DUPLO_LIB:
self.connected = True
return True, "simulation (duploController non trouvé)"
try:
fut = self._submit(self._hub.connect())
fut.result(timeout=timeout)
self.connected = True
return True, "connecté"
except Exception as exc:
return False, f"erreur connexion: {exc}"
def disconnect(self, timeout=5.0) -> Tuple[bool, str]:
if not self.connected:
return True, "déjà déconnecté"
if not HAS_DUPLO_LIB:
self.connected = False
return True, "simulation déconnectée"
try:
fut = self._submit(self._hub.disconnect())
fut.result(timeout=timeout)
self.connected = False
self.last_speed = 0
return True, "déconnecté"
except Exception as exc:
return False, f"erreur déconnexion: {exc}"
def set_speed(self, speed: int):
speed = max(-100, min(100, speed))
if speed == self.last_speed and self.pending_speed is None:
return
self.pending_speed = speed
self.flush_speed()
def flush_speed(self):
if self.pending_speed is None:
return
now = time.time()
if now - self.last_sent < self.min_command_interval:
return
speed = self.pending_speed
self.pending_speed = None
self.last_speed = speed
self.last_sent = now
if not self.connected:
return
if not HAS_DUPLO_LIB:
return
self._submit(self._hub.set_motor_speed(speed))
def stop(self):
self.pending_speed = None
self.last_speed = 0
self.last_sent = time.time()
if not self.connected:
return
if not HAS_DUPLO_LIB:
return
self._submit(self._hub.stop())
def play_sound(self, sound):
if not self.connected or not HAS_DUPLO_LIB:
return
self._submit(self._hub.play_sound(sound))
def set_light(self, color):
if not self.connected or not HAS_DUPLO_LIB:
return
self._submit(self._hub.change_light_color(color))
class DuploGame:
def __init__(self):
pygame.init()
pygame.display.set_caption("Duplo Arcade Controller")
self.screen = pygame.display.set_mode((WIDTH, HEIGHT))
self.clock = pygame.time.Clock()
self.title_font = pygame.font.SysFont("arial", 56, bold=True)
self.big_font = pygame.font.SysFont("arial", 36, bold=True)
self.font = pygame.font.SysFont("arial", 24)
self.small_font = pygame.font.SysFont("arial", 20)
self.splash_image = self._load_splash_image()
self.state = GameState.SPLASH
self.splash_started = time.time()
self.running = True
self.frame_events = []
self.input = InputManager()
self.joy_config_path = Path("joystick_mappings.json")
self.key_config_path = Path("keyboard_mappings.json")
self.ble_loop = AsyncLoopThread()
self.player_count = 1
self.trains = [TrainProxy(self.ble_loop), TrainProxy(self.ble_loop)]
self.addresses = ["", ""]
self.status_lines = ["appuyer ACTION 1 pour connecter", "disponible si mode 2 joueurs"]
self.speed_targets = [0, 0]
self.color_index = [0, 0]
self.speed_adjust_interval = 0.08
self.speed_adjust_step = 4
self.last_speed_adjust = [0.0, 0.0]
self.config_player = 0
self.config_slot_idx = 0
self.config_slots = ["UP", "DOWN", "LEFT", "RIGHT", "A1", "A2", "A3", "A4", "A5", "A6", "SELECT", "START"]
self.awaiting_joy_button = False
self.config_message = "ACTION 1 pour modifier, SELECT pour reset joueur, START pour retour menu"
self.key_config_player = 0
self.key_config_slot_idx = 0
self.key_config_slots = ["UP", "DOWN", "LEFT", "RIGHT", "A1", "A2", "A3", "A4", "A5", "A6", "SELECT", "START"]
self.awaiting_key_press = False
self.key_config_message = "ACTION 1 pour modifier, SELECT pour reset joueur, START pour retour menu"
self.discovered_trains: List[Dict[str, object]] = []
self.selected_train_index = [0, 0]
self.scan_attempted = False
self._load_joy_mappings()
self._load_key_mappings()
self.colors = []
if HAS_DUPLO_LIB:
self.colors = [
DuploColor.PINK,
DuploColor.PURPLE,
DuploColor.BLUE,
DuploColor.LIGHTBLUE,
DuploColor.CYAN,
DuploColor.GREEN,
DuploColor.YELLOW,
DuploColor.ORANGE,
DuploColor.RED,
DuploColor.WHITE,
]
def run(self):
while self.running:
self._handle_events()
self.input.update()
if self.state == GameState.SPLASH:
self._update_splash()
self._draw_splash()
elif self.state == GameState.MENU:
self._update_menu()
self._draw_menu()
elif self.state == GameState.CONTROL:
self._update_control()
self._draw_control()
elif self.state == GameState.JOY_CONFIG:
self._update_joy_config()
self._draw_joy_config()
elif self.state == GameState.KEY_CONFIG:
self._update_key_config()
self._draw_key_config()
pygame.display.flip()
self.clock.tick(FPS)
self._shutdown()
def _handle_events(self):
self.frame_events = pygame.event.get()
for event in self.frame_events:
if event.type == pygame.QUIT:
self.running = False
elif event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE:
self.running = False
elif event.type == pygame.KEYDOWN and event.key == pygame.K_c and self.state == GameState.MENU:
self.state = GameState.JOY_CONFIG
self.awaiting_joy_button = False
self.config_message = "Config joystick: ACTION 1 pour modifier"
elif event.type == pygame.KEYDOWN and event.key == pygame.K_k and self.state == GameState.MENU:
self.state = GameState.KEY_CONFIG
self.awaiting_key_press = False
self.key_config_message = "Config clavier: ACTION 1 pour modifier"
elif event.type in (pygame.JOYDEVICEADDED, pygame.JOYDEVICEREMOVED):
self.input.refresh_joysticks()
def _update_splash(self):
elapsed = time.time() - self.splash_started
p0 = self.input.state(0)
p1 = self.input.state(1)
if elapsed >= SPLASH_DURATION or p0.start_pressed or p1.start_pressed or p0.actions_pressed[0]:
self.state = GameState.MENU
def _update_menu(self):
p0 = self.input.state(0)
p1 = self.input.state(1)
if not self.scan_attempted:
self._scan_available_trains()
any_up = p0.up or p1.up
any_down = p0.down or p1.down
if any_up:
self.player_count = 1
if any_down:
self.player_count = 2
if p0.left_pressed:
self._cycle_selected_train(0, -1)
if p0.right_pressed:
self._cycle_selected_train(0, +1)
if self.player_count == 2 and p1.left_pressed:
self._cycle_selected_train(1, -1)
if self.player_count == 2 and p1.right_pressed:
self._cycle_selected_train(1, +1)
if p0.actions_pressed[0]:
self._connect_selected_or_toggle(0)
if self.player_count == 2 and p1.actions_pressed[0]:
self._connect_selected_or_toggle(1)
if p0.start_pressed:
required = 1 if self.player_count == 1 else 2
connected = sum(1 for i in range(self.player_count) if self.trains[i].connected)
if connected >= required:
self.state = GameState.CONTROL
self.status_lines[0] = "contrôle actif"
if self.player_count == 2:
self.status_lines[1] = "contrôle actif"
else:
self.status_lines[0] = "connectez les trains requis avant START"
if p0.select_pressed:
self.state = GameState.JOY_CONFIG
self.awaiting_joy_button = False
self.config_message = "Config joystick: ACTION 1 pour modifier"
if p0.actions_pressed[1]:
self.state = GameState.KEY_CONFIG
self.awaiting_key_press = False
self.key_config_message = "Config clavier: ACTION 1 pour modifier"
async def _discover_duplo_trains_async(self) -> List[Dict[str, object]]:
if not HAS_BLEAK:
return []
found = await BleakScanner.discover(timeout=4.0, return_adv=True)
trains = []
for _, payload in found.items():
device, advertisement = payload
manufacturer_data = getattr(advertisement, "manufacturer_data", {}) or {}
lego_data = manufacturer_data.get(0x0397)
if not isinstance(lego_data, (bytes, bytearray)):
continue
is_duplo = False
if len(lego_data) >= 2 and lego_data[1] == 0x20:
is_duplo = True
elif len(lego_data) >= 1 and lego_data[0] == 0x20:
is_duplo = True
if not is_duplo:
continue
name = device.name or getattr(advertisement, "local_name", None) or "DUPLO Hub"
trains.append(
{
"address": device.address,
"name": name,
"rssi": getattr(device, "rssi", None),
}
)
uniq = {}
for train in trains:
uniq[train["address"]] = train
result = list(uniq.values())
result.sort(key=lambda item: item.get("rssi") if isinstance(item.get("rssi"), int) else -999, reverse=True)
return result
def _scan_available_trains(self):
self.scan_attempted = True
if not HAS_BLEAK:
self.status_lines[0] = "BLE scanner indisponible (bleak introuvable)"
return
try:
fut = self.ble_loop.run(self._discover_duplo_trains_async())
self.discovered_trains = fut.result(timeout=8.0)
self.selected_train_index = [0, 0]
if self.discovered_trains:
self.status_lines[0] = f"{len(self.discovered_trains)} train(s) trouvé(s)"
if self.player_count == 2 and len(self.discovered_trains) > 1:
self.selected_train_index[1] = 1
else:
self.status_lines[0] = "aucun train trouvé (vérifier alimentation/BLE)"
except Exception as exc:
self.discovered_trains = []
self.status_lines[0] = f"erreur scan BLE: {exc}"
def _cycle_selected_train(self, player: int, delta: int):
if not self.discovered_trains:
return
count = len(self.discovered_trains)
self.selected_train_index[player] = (self.selected_train_index[player] + delta) % count
def _selected_train_for_player(self, player: int) -> Optional[Dict[str, object]]:
if not self.discovered_trains:
return None
idx = self.selected_train_index[player] % len(self.discovered_trains)
return self.discovered_trains[idx]
def _connect_selected_or_toggle(self, player: int):
if player == 1 and self.player_count == 1:
return
train = self.trains[player]
if train.connected:
ok, msg = train.disconnect()
self.status_lines[player] = ("OK: " if ok else "KO: ") + msg
return
if not self.discovered_trains:
self._scan_available_trains()
if not self.discovered_trains:
self.status_lines[player] = "KO: aucun train disponible"
return
chosen = self._selected_train_for_player(player)
if chosen is None:
self.status_lines[player] = "KO: sélection invalide"
return
chosen_addr = chosen["address"]
other = 1 - player
if self.trains[other].connected and self.trains[other].address == chosen_addr:
self.status_lines[player] = "KO: ce train est déjà utilisé par l'autre joueur"
return
train.set_address(chosen_addr)
ok, msg = train.connect()
train_name = chosen.get("name", "DUPLO Hub")
self.status_lines[player] = ("OK: " if ok else "KO: ") + f"{train_name} {msg}"
def _toggle_connect(self, player: int):
if player == 1 and self.player_count == 1:
return
train = self.trains[player]
if train.connected:
ok, msg = train.disconnect()
else:
ok, msg = train.connect()
self.status_lines[player] = ("OK: " if ok else "KO: ") + msg
def _sound_for_action(self, idx: int):
if not HAS_DUPLO_LIB:
return None
sounds = [
DuploSound.HORN,
DuploSound.BRAKE,
DuploSound.STEAM,
DuploSound.STATION_DEPARTURE,
DuploSound.WATER_REFILL,
None,
]
return sounds[idx]
def _update_control(self):
now = time.time()
for player in range(self.player_count):
state = self.input.state(player)
train = self.trains[player]
delta = 0
if state.up:
delta += 2
if state.down:
delta -= 2
if delta and now - self.last_speed_adjust[player] >= self.speed_adjust_interval:
step = self.speed_adjust_step if delta > 0 else -self.speed_adjust_step
self.speed_targets[player] = max(-100, min(100, self.speed_targets[player] + step))
train.set_speed(self.speed_targets[player])
self.last_speed_adjust[player] = now
train.flush_speed()
if state.actions_pressed[5]:
self.speed_targets[player] = 0
train.stop()
for action_idx in range(5):
if state.actions_pressed[action_idx]:
sound = self._sound_for_action(action_idx)
if sound is not None:
train.play_sound(sound)
if state.left_pressed and self.colors:
self.color_index[player] = (self.color_index[player] - 1) % len(self.colors)
train.set_light(self.colors[self.color_index[player]])
if state.right_pressed and self.colors:
self.color_index[player] = (self.color_index[player] + 1) % len(self.colors)
train.set_light(self.colors[self.color_index[player]])
if state.select_pressed:
self.speed_targets[player] = 0
train.stop()
if state.start_pressed:
self.state = GameState.MENU
self.status_lines[player] = "retour menu"
def _update_joy_config(self):
p0 = self.input.state(0)
if p0.start_pressed and not self.awaiting_joy_button:
self.state = GameState.MENU
self.config_message = "Retour menu"
return
if not self.awaiting_joy_button:
if p0.left:
self.config_player = 0
if p0.right:
self.config_player = 1
if p0.up:
self.config_slot_idx = (self.config_slot_idx - 1) % len(self.config_slots)
if p0.down:
self.config_slot_idx = (self.config_slot_idx + 1) % len(self.config_slots)
if p0.actions_pressed[0]:
self.awaiting_joy_button = True
self.config_message = f"J{self.config_player + 1} {self.config_slots[self.config_slot_idx]}: appuyez sur un bouton joystick"
elif p0.select_pressed:
self.input.reset_joy_bindings(self.config_player)
self._save_joy_mappings()
self.config_message = f"J{self.config_player + 1}: mapping joystick remis par défaut"
return
if p0.select_pressed:
self.awaiting_joy_button = False
self.config_message = "Capture annulée"
return
expected_player = self.config_player
slot = self.config_slots[self.config_slot_idx]
for event in self.frame_events:
if event.type != pygame.JOYBUTTONDOWN:
continue
event_player = self.input.player_for_instance_id(event.instance_id)
if event_player is None:
continue
if event_player != expected_player:
self.config_message = f"Bouton ignoré: joystick joueur {event_player + 1}, attendu joueur {expected_player + 1}"
continue
self.input.set_joy_binding(expected_player, slot, event.button)
self._save_joy_mappings()
self.awaiting_joy_button = False
self.config_message = f"J{expected_player + 1} {slot} = bouton {event.button}"
break
def _update_key_config(self):
p0 = self.input.state(0)
if p0.start_pressed and not self.awaiting_key_press:
self.state = GameState.MENU
self.key_config_message = "Retour menu"
return
if not self.awaiting_key_press:
if p0.left:
self.key_config_player = 0
if p0.right:
self.key_config_player = 1
if p0.up:
self.key_config_slot_idx = (self.key_config_slot_idx - 1) % len(self.key_config_slots)
if p0.down:
self.key_config_slot_idx = (self.key_config_slot_idx + 1) % len(self.key_config_slots)
if p0.actions_pressed[0]:
self.awaiting_key_press = True
slot = self.key_config_slots[self.key_config_slot_idx]
self.key_config_message = f"J{self.key_config_player + 1} {slot}: appuyez sur une touche clavier"
elif p0.select_pressed:
self.input.reset_key_bindings(self.key_config_player)
self._save_key_mappings()
self.key_config_message = f"J{self.key_config_player + 1}: mapping clavier remis par défaut"
return
if p0.select_pressed:
self.awaiting_key_press = False
self.key_config_message = "Capture annulée"
return
expected_player = self.key_config_player
slot = self.key_config_slots[self.key_config_slot_idx]
for event in self.frame_events:
if event.type != pygame.KEYDOWN:
continue
if event.key == pygame.K_ESCAPE:
continue
self.input.set_key_binding(expected_player, slot, event.key)
self._save_key_mappings()
self.awaiting_key_press = False
key_name = pygame.key.name(event.key)
self.key_config_message = f"J{expected_player + 1} {slot} = {key_name}"
break
def _save_joy_mappings(self):
try:
payload = self.input.export_joy_bindings()
with self.joy_config_path.open("w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2)
except Exception as exc:
self.config_message = f"Erreur sauvegarde mappings: {exc}"
def _load_joy_mappings(self):
if not self.joy_config_path.exists():
return
try:
with self.joy_config_path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
if isinstance(data, dict):
self.input.import_joy_bindings(data)
self.config_message = "Mappings joystick chargés depuis joystick_mappings.json"
except Exception as exc:
self.config_message = f"Erreur chargement mappings: {exc}"
def _save_key_mappings(self):
try:
payload = self.input.export_key_bindings()
with self.key_config_path.open("w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2)
except Exception as exc:
self.key_config_message = f"Erreur sauvegarde mappings clavier: {exc}"
def _load_key_mappings(self):
if not self.key_config_path.exists():
return
try:
with self.key_config_path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
if isinstance(data, dict):
self.input.import_key_bindings(data)
self.key_config_message = "Mappings clavier chargés depuis keyboard_mappings.json"
except Exception as exc:
self.key_config_message = f"Erreur chargement mappings clavier: {exc}"
def _draw_text(self, surface, text, font, color, x, y, center=False):
s = font.render(text, True, color)
r = s.get_rect()
if center:
r.center = (x, y)
else:
r.topleft = (x, y)
surface.blit(s, r)
def _load_splash_image(self):
image_path = Path("img") / "train.png"
if not image_path.exists():
return None
try:
image = pygame.image.load(image_path.as_posix()).convert_alpha()
w, h = image.get_size()
max_w = int(WIDTH * 0.55)
max_h = int(HEIGHT * 0.45)
scale = min(max_w / w, max_h / h)
if scale < 1:
image = pygame.transform.smoothscale(image, (int(w * scale), int(h * scale)))
return image
except Exception:
return None
def _draw_splash(self):
self.screen.fill(BG)
y_title = HEIGHT // 2 - 40
if self.splash_image is not None:
img_rect = self.splash_image.get_rect(center=(WIDTH // 2, HEIGHT // 2 - 110))
self.screen.blit(self.splash_image, img_rect)
y_title = HEIGHT // 2 + 120
self._draw_text(self.screen, "DUPLO ARCADE", self.title_font, TXT, WIDTH // 2, y_title, center=True)
self._draw_text(
self.screen,
"Contrôle clavier/joystick - Appuyer START ou attendre",
self.font,
WARN,
WIDTH // 2,
y_title + 64,
center=True,
)
if not HAS_DUPLO_LIB:
self._draw_text(
self.screen,
"duploController non trouvé: mode simulation actif",
self.small_font,
BAD,
WIDTH // 2,
y_title + 110,
center=True,
)
def _draw_menu(self):
self.screen.fill(BG)
self._draw_text(self.screen, "Connexion des trains", self.big_font, TXT, 40, 24)
self._draw_text(self.screen, "UP=1 joueur | DOWN=2 joueurs", self.font, WARN, 40, 76)
self._draw_text(self.screen, f"Mode actuel: {self.player_count} joueur(s)", self.font, TXT, 40, 110)
self._draw_text(
self.screen,
"ACTION 1 sur chaque joueur: connecter/déconnecter | START P1: lancer",
self.small_font,
TXT,
40,
146,
)
self._draw_text(self.screen, "SELECT P1 ou touche C: config joystick", self.small_font, WARN, 40, 172)
self._draw_text(self.screen, "ACTION 2 P1 ou touche K: config clavier", self.small_font, WARN, 40, 198)
self._draw_text(self.screen, "LEFT/RIGHT joueur: choisir train trouvé | ACTION 1: connecter", self.small_font, TXT, 40, 224)
if self.discovered_trains:
preview = []
for idx, train in enumerate(self.discovered_trains[:3]):
mark = "*" if idx in [self.selected_train_index[0], self.selected_train_index[1]] else " "
name = train.get("name", "DUPLO Hub")
addr = train.get("address", "?")
preview.append(f"{mark}{idx + 1}: {name} [{addr}]")
self._draw_text(self.screen, " | ".join(preview), self.small_font, GOOD, 40, 252)
else:
self._draw_text(self.screen, "Scan BLE en cours ou aucun train trouvé", self.small_font, BAD, 40, 252)
panel_w = (WIDTH - 80 - 20) // 2
for i in [0, 1]:
x = 40 + i * (panel_w + 20)
y = 280
active = i < self.player_count
rect = pygame.Rect(x, y, panel_w, 300)
pygame.draw.rect(self.screen, PANEL_BG, rect, border_radius=10)
pygame.draw.rect(self.screen, PANEL_BORDER, rect, 2, border_radius=10)
title = f"Train Joueur {i + 1}"
self._draw_text(self.screen, title, self.font, TXT if active else (130, 130, 130), x + 16, y + 16)
if not active:
self._draw_text(self.screen, "Inactif (mode 1 joueur)", self.small_font, WARN, x + 16, y + 58)
continue
status = "Connecté" if self.trains[i].connected else "Déconnecté"
status_color = GOOD if self.trains[i].connected else BAD
self._draw_text(self.screen, f"État: {status}", self.font, status_color, x + 16, y + 58)
self._draw_text(self.screen, self.status_lines[i], self.small_font, TXT, x + 16, y + 98)
chosen = self._selected_train_for_player(i)
if chosen is not None:
chosen_name = chosen.get("name", "DUPLO Hub")
chosen_addr = chosen.get("address", "?")
self._draw_text(self.screen, f"Sélection: {chosen_name}", self.small_font, WARN, x + 16, y + 126)
self._draw_text(self.screen, str(chosen_addr), self.small_font, WARN, x + 16, y + 146)
else:
self._draw_text(self.screen, "Sélection: aucun train", self.small_font, WARN, x + 16, y + 126)
ctrl_help = [
"Directions: vitesse (+/-) et lumière (gauche/droite)",
"Action1..5: sons train", "Action6 / Select: stop immédiat",
"Start: retour menu en jeu",
]
for n, line in enumerate(ctrl_help):
self._draw_text(self.screen, line, self.small_font, (190, 203, 219), x + 16, y + 178 + 24 * n)
def _draw_joy_config(self):
self.screen.fill(BG)
self._draw_text(self.screen, "Configuration Joystick", self.big_font, TXT, 40, 24)
self._draw_text(self.screen, "LEFT/RIGHT: joueur | UP/DOWN: fonction | ACTION 1: remapper", self.small_font, WARN, 40, 78)
self._draw_text(self.screen, "SELECT: reset joueur | START: retour menu", self.small_font, WARN, 40, 104)
panel_w = (WIDTH - 80 - 20) // 2
for player in [0, 1]:
x = 40 + player * (panel_w + 20)
y = 150
rect = pygame.Rect(x, y, panel_w, 460)
pygame.draw.rect(self.screen, PANEL_BG, rect, border_radius=12)
pygame.draw.rect(self.screen, PANEL_BORDER, rect, 2, border_radius=12)
selected_player = player == self.config_player
color = GOOD if selected_player else TXT
self._draw_text(self.screen, f"Joueur {player + 1}", self.font, color, x + 16, y + 14)
joy = self.input._joy_for_player(player)
if joy is None:
self._draw_text(self.screen, "Aucun joystick détecté pour ce joueur", self.small_font, BAD, x + 16, y + 46)
else:
name = joy.get_name()
self._draw_text(self.screen, f"Joystick: {name}", self.small_font, TXT, x + 16, y + 46)
mapping = self.input.get_joy_bindings(player)
rows = [
("UP", mapping.get("up", -1)),
("DOWN", mapping.get("down", -1)),
("LEFT", mapping.get("left", -1)),
("RIGHT", mapping.get("right", -1)),
("A1", mapping["actions"][0]),
("A2", mapping["actions"][1]),
("A3", mapping["actions"][2]),
("A4", mapping["actions"][3]),
("A5", mapping["actions"][4]),
("A6", mapping["actions"][5]),
("SELECT", mapping["select"]),
("START", mapping["start"]),
]
for idx, (slot, button) in enumerate(rows):
line_y = y + 80 + idx * 30
is_selected_slot = selected_player and idx == self.config_slot_idx
line_color = WARN if is_selected_slot else TXT
prefix = "> " if is_selected_slot else " "
button_label = "désactivé" if button < 0 else f"bouton {button}"
self._draw_text(self.screen, f"{prefix}{slot}", self.small_font, line_color, x + 16, line_y)
self._draw_text(self.screen, button_label, self.small_font, line_color, x + 150, line_y)
msg_color = BAD if self.awaiting_joy_button else TXT
self._draw_text(self.screen, self.config_message, self.small_font, msg_color, 40, HEIGHT - 46)
def _draw_key_config(self):
self.screen.fill(BG)
self._draw_text(self.screen, "Configuration Clavier", self.big_font, TXT, 40, 24)
self._draw_text(self.screen, "LEFT/RIGHT: joueur | UP/DOWN: fonction | ACTION 1: remapper", self.small_font, WARN, 40, 78)
self._draw_text(self.screen, "SELECT: reset joueur | START: retour menu", self.small_font, WARN, 40, 104)
panel_w = (WIDTH - 80 - 20) // 2
for player in [0, 1]:
x = 40 + player * (panel_w + 20)
y = 150
rect = pygame.Rect(x, y, panel_w, 460)
pygame.draw.rect(self.screen, PANEL_BG, rect, border_radius=12)
pygame.draw.rect(self.screen, PANEL_BORDER, rect, 2, border_radius=12)
selected_player = player == self.key_config_player
color = GOOD if selected_player else TXT
self._draw_text(self.screen, f"Joueur {player + 1}", self.font, color, x + 16, y + 14)
rows = self.input.get_key_binding_rows(player)
for idx, (slot, key_value) in enumerate(rows):
line_y = y + 60 + idx * 32
is_selected_slot = selected_player and idx == self.key_config_slot_idx
line_color = WARN if is_selected_slot else TXT
prefix = "> " if is_selected_slot else " "
key_name = pygame.key.name(key_value)
self._draw_text(self.screen, f"{prefix}{slot}", self.small_font, line_color, x + 16, line_y)
self._draw_text(self.screen, key_name, self.small_font, line_color, x + 170, line_y)
msg_color = BAD if self.awaiting_key_press else TXT
self._draw_text(self.screen, self.key_config_message, self.small_font, msg_color, 40, HEIGHT - 46)
def _draw_control(self):
self.screen.fill(BG)
panel_w = (WIDTH - 80 - 20) // 2
for i in [0, 1]:
x = 40 + i * (panel_w + 20)
y = 40
h = HEIGHT - 80
rect = pygame.Rect(x, y, panel_w, h)
pygame.draw.rect(self.screen, PANEL_BG, rect, border_radius=12)
pygame.draw.rect(self.screen, PANEL_BORDER, rect, 2, border_radius=12)
active = i < self.player_count
title_color = TXT if active else (130, 130, 130)
self._draw_text(self.screen, f"Joueur {i + 1}", self.big_font, title_color, x + 20, y + 20)
if not active:
self._draw_text(self.screen, "Zone inactive", self.font, WARN, x + 20, y + 90)
continue
connected = self.trains[i].connected
self._draw_text(
self.screen,
f"Train: {'connecté' if connected else 'déconnecté'}",
self.font,
GOOD if connected else BAD,
x + 20,
y + 90,
)
speed = self.speed_targets[i]
self._draw_text(self.screen, f"Vitesse cible: {speed}", self.font, TXT, x + 20, y + 130)
bar_w = panel_w - 40
bar_h = 28
bar_x = x + 20
bar_y = y + 170
pygame.draw.rect(self.screen, (40, 51, 68), (bar_x, bar_y, bar_w, bar_h), border_radius=6)
center_x = bar_x + bar_w // 2
pygame.draw.line(self.screen, (80, 95, 115), (center_x, bar_y), (center_x, bar_y + bar_h), 2)
fill = int((abs(speed) / 100.0) * (bar_w // 2))
if speed >= 0:
pygame.draw.rect(self.screen, GOOD, (center_x, bar_y + 4, fill, bar_h - 8), border_radius=5)
else:
pygame.draw.rect(self.screen, BAD, (center_x - fill, bar_y + 4, fill, bar_h - 8), border_radius=5)
self._draw_text(self.screen, "UP/DOWN: accélérer/freiner", self.small_font, TXT, x + 20, y + 240)
self._draw_text(self.screen, "LEFT/RIGHT: changer lumière", self.small_font, TXT, x + 20, y + 270)
self._draw_text(self.screen, "A1..A5: sons | A6/Select: stop", self.small_font, TXT, x + 20, y + 300)
self._draw_text(self.screen, "Start: retour menu", self.small_font, TXT, x + 20, y + 330)
self._draw_text(self.screen, "ESC pour quitter", self.small_font, WARN, WIDTH - 170, HEIGHT - 35)
def _shutdown(self):
for train in self.trains:
train.disconnect(timeout=2.0)
self.ble_loop.stop()
pygame.quit()
if __name__ == "__main__":
game = DuploGame()
game.run()