import asyncio import json import threading import time from dataclasses import dataclass from enum import Enum, auto from pathlib import Path from typing import Dict, List, Optional, Tuple import pygame try: from duplo_controller import DuploColor, DuploSound, DuploTrainHub HAS_DUPLO_LIB = True except Exception: HAS_DUPLO_LIB = False DuploTrainHub = None DuploColor = None DuploSound = None try: from bleak import BleakScanner HAS_BLEAK = True except Exception: HAS_BLEAK = False BleakScanner = None WIDTH, HEIGHT = 1280, 720 FPS = 60 SPLASH_DURATION = 2.5 BG = (14, 18, 26) PANEL_BG = (25, 33, 46) PANEL_BORDER = (59, 76, 103) TXT = (235, 240, 245) GOOD = (86, 209, 124) WARN = (232, 188, 79) BAD = (225, 98, 98) class GameState(Enum): SPLASH = auto() MENU = auto() CONTROL = auto() JOY_CONFIG = auto() KEY_CONFIG = auto() class AsyncLoopThread: def __init__(self): self._loop = asyncio.new_event_loop() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def _run(self): asyncio.set_event_loop(self._loop) self._loop.run_forever() def run(self, coro): return asyncio.run_coroutine_threadsafe(coro, self._loop) def stop(self): self._loop.call_soon_threadsafe(self._loop.stop) self._thread.join(timeout=1.0) @dataclass class PlayerBindings: up: List[int] down: List[int] left: List[int] right: List[int] actions: List[int] start: List[int] select: List[int] @dataclass class PlayerInputState: up: bool = False down: bool = False left: bool = False left_pressed: bool = False right: bool = False right_pressed: bool = False actions_down: List[bool] = None actions_pressed: List[bool] = None start_down: bool = False start_pressed: bool = False select_down: bool = False select_pressed: bool = False def __post_init__(self): if self.actions_down is None: self.actions_down = [False] * 6 if self.actions_pressed is None: self.actions_pressed = [False] * 6 class InputManager: def __init__(self): self.bindings = { 0: PlayerBindings( up=[pygame.K_UP], down=[pygame.K_DOWN], left=[pygame.K_LEFT], right=[pygame.K_RIGHT], actions=[pygame.K_u, pygame.K_i, pygame.K_o, pygame.K_j, pygame.K_k, pygame.K_l], start=[pygame.K_RSHIFT], select=[pygame.K_RCTRL], ), 1: PlayerBindings( up=[pygame.K_z], down=[pygame.K_s], left=[pygame.K_q], right=[pygame.K_d], actions=[pygame.K_r, pygame.K_t, pygame.K_y, pygame.K_f, pygame.K_g, pygame.K_h], start=[pygame.K_RETURN], select=[pygame.K_BACKSPACE], ) , } self.default_bindings = { 0: PlayerBindings( up=[pygame.K_UP], down=[pygame.K_DOWN], left=[pygame.K_LEFT], right=[pygame.K_RIGHT], actions=[pygame.K_u, pygame.K_i, pygame.K_o, pygame.K_j, pygame.K_k, pygame.K_l], start=[pygame.K_RSHIFT], select=[pygame.K_RCTRL], ), 1: PlayerBindings( up=[pygame.K_z], down=[pygame.K_s], left=[pygame.K_q], right=[pygame.K_d], actions=[pygame.K_r, pygame.K_t, pygame.K_y, pygame.K_f, pygame.K_g, pygame.K_h], start=[pygame.K_RETURN], select=[pygame.K_BACKSPACE], ) } self.prev: Dict[int, PlayerInputState] = {0: PlayerInputState(), 1: PlayerInputState()} self.states: Dict[int, PlayerInputState] = {0: PlayerInputState(), 1: PlayerInputState()} self.joy_bindings = { 0: {"actions": [0, 1, 2, 3, 4, 5], "select": 6, "start": 7, "up": -1, "down": -1, "left": -1, "right": -1}, 1: {"actions": [0, 1, 2, 3, 4, 5], "select": 6, "start": 7, "up": -1, "down": -1, "left": -1, "right": -1}, } pygame.joystick.init() self.joysticks = [] self.refresh_joysticks() def refresh_joysticks(self): self.joysticks = [] for idx in range(pygame.joystick.get_count()): joy = pygame.joystick.Joystick(idx) joy.init() self.joysticks.append(joy) def _joy_for_player(self, player_idx: int): return self.joysticks[player_idx] if player_idx < len(self.joysticks) else None def player_for_instance_id(self, instance_id: int) -> Optional[int]: for player, joy in enumerate(self.joysticks): if joy.get_instance_id() == instance_id: return player return None def set_joy_binding(self, player: int, slot: str, button: int): if player not in self.joy_bindings: return if slot.startswith("A"): idx = int(slot[1:]) - 1 if 0 <= idx < 6: self.joy_bindings[player]["actions"][idx] = button elif slot == "START": self.joy_bindings[player]["start"] = button elif slot == "SELECT": self.joy_bindings[player]["select"] = button elif slot in ("UP", "DOWN", "LEFT", "RIGHT"): self.joy_bindings[player][slot.lower()] = button def reset_joy_bindings(self, player: int): self.joy_bindings[player] = { "actions": [0, 1, 2, 3, 4, 5], "select": 6, "start": 7, "up": -1, "down": -1, "left": -1, "right": -1, } def get_joy_bindings(self, player: int): return self.joy_bindings[player] def set_key_binding(self, player: int, slot: str, key: int): if player not in self.bindings: return b = self.bindings[player] if slot == "UP": b.up = [key] elif slot == "DOWN": b.down = [key] elif slot == "LEFT": b.left = [key] elif slot == "RIGHT": b.right = [key] elif slot.startswith("A"): idx = int(slot[1:]) - 1 if 0 <= idx < 6: b.actions[idx] = key elif slot == "START": b.start = [key] elif slot == "SELECT": b.select = [key] def reset_key_bindings(self, player: int): d = self.default_bindings[player] self.bindings[player] = PlayerBindings( up=list(d.up), down=list(d.down), left=list(d.left), right=list(d.right), actions=list(d.actions), start=list(d.start), select=list(d.select), ) def get_key_binding_rows(self, player: int): b = self.bindings[player] return [ ("UP", b.up[0]), ("DOWN", b.down[0]), ("LEFT", b.left[0]), ("RIGHT", b.right[0]), ("A1", b.actions[0]), ("A2", b.actions[1]), ("A3", b.actions[2]), ("A4", b.actions[3]), ("A5", b.actions[4]), ("A6", b.actions[5]), ("SELECT", b.select[0]), ("START", b.start[0]), ] def export_joy_bindings(self): return { "player1": self.joy_bindings[0], "player2": self.joy_bindings[1], } def export_key_bindings(self): out = {} for player, key in [(0, "player1"), (1, "player2")]: b = self.bindings[player] out[key] = { "up": b.up[0], "down": b.down[0], "left": b.left[0], "right": b.right[0], "actions": list(b.actions), "start": b.start[0], "select": b.select[0], } return out def import_joy_bindings(self, data: dict): for player, key in [(0, "player1"), (1, "player2")]: raw = data.get(key) if not isinstance(raw, dict): continue actions = raw.get("actions", []) if not isinstance(actions, list) or len(actions) != 6: continue normalized_actions = [] valid = True for b in actions: if not isinstance(b, int) or b < 0: valid = False break normalized_actions.append(b) if not valid: continue start = raw.get("start") select = raw.get("select") if not isinstance(start, int) or start < 0: continue if not isinstance(select, int) or select < 0: continue up_btn = raw.get("up", -1) down_btn = raw.get("down", -1) left_btn = raw.get("left", -1) right_btn = raw.get("right", -1) directions_are_valid = True for v in [up_btn, down_btn, left_btn, right_btn]: if not isinstance(v, int) or v < -1: directions_are_valid = False break if not directions_are_valid: continue self.joy_bindings[player] = { "actions": normalized_actions, "start": start, "select": select, "up": up_btn, "down": down_btn, "left": left_btn, "right": right_btn, } def import_key_bindings(self, data: dict): for player, key in [(0, "player1"), (1, "player2")]: raw = data.get(key) if not isinstance(raw, dict): continue actions = raw.get("actions") if not isinstance(actions, list) or len(actions) != 6: continue required = ["up", "down", "left", "right", "start", "select"] if any(not isinstance(raw.get(name), int) or raw.get(name) < 0 for name in required): continue if any(not isinstance(a, int) or a < 0 for a in actions): continue self.bindings[player] = PlayerBindings( up=[raw["up"]], down=[raw["down"]], left=[raw["left"]], right=[raw["right"]], actions=list(actions), start=[raw["start"]], select=[raw["select"]], ) @staticmethod def _key_any_pressed(keys, keycodes): return any(keys[k] for k in keycodes) def update(self): keys = pygame.key.get_pressed() self.prev = {p: self.states[p] for p in self.states} self.states = {0: PlayerInputState(), 1: PlayerInputState()} for player in [0, 1]: b = self.bindings[player] s = self.states[player] s.up = self._key_any_pressed(keys, b.up) s.down = self._key_any_pressed(keys, b.down) s.left = self._key_any_pressed(keys, b.left) s.right = self._key_any_pressed(keys, b.right) s.actions_down = [self._key_any_pressed(keys, [k]) for k in b.actions] s.start_down = self._key_any_pressed(keys, b.start) s.select_down = self._key_any_pressed(keys, b.select) joy = self._joy_for_player(player) if joy is not None: joy_map = self.joy_bindings[player] try: hat_x, hat_y = joy.get_hat(0) except Exception: hat_x, hat_y = 0, 0 axis_x = joy.get_axis(0) if joy.get_numaxes() > 0 else 0 axis_y = joy.get_axis(1) if joy.get_numaxes() > 1 else 0 s.up = s.up or hat_y > 0 or axis_y < -0.5 s.down = s.down or hat_y < 0 or axis_y > 0.5 s.left = s.left or hat_x < 0 or axis_x < -0.5 s.right = s.right or hat_x > 0 or axis_x > 0.5 up_btn = joy_map.get("up", -1) down_btn = joy_map.get("down", -1) left_btn = joy_map.get("left", -1) right_btn = joy_map.get("right", -1) if up_btn >= 0 and up_btn < joy.get_numbuttons() and joy.get_button(up_btn): s.up = True if down_btn >= 0 and down_btn < joy.get_numbuttons() and joy.get_button(down_btn): s.down = True if left_btn >= 0 and left_btn < joy.get_numbuttons() and joy.get_button(left_btn): s.left = True if right_btn >= 0 and right_btn < joy.get_numbuttons() and joy.get_button(right_btn): s.right = True for i, btn in enumerate(joy_map["actions"]): if btn < joy.get_numbuttons() and joy.get_button(btn): s.actions_down[i] = True start_btn = joy_map["start"] select_btn = joy_map["select"] if start_btn < joy.get_numbuttons() and joy.get_button(start_btn): s.start_down = True if select_btn < joy.get_numbuttons() and joy.get_button(select_btn): s.select_down = True prev = self.prev[player] s.left_pressed = s.left and not prev.left s.right_pressed = s.right and not prev.right s.actions_pressed = [now and not prev.actions_down[i] for i, now in enumerate(s.actions_down)] s.start_pressed = s.start_down and not prev.start_down s.select_pressed = s.select_down and not prev.select_down def state(self, player_idx: int) -> PlayerInputState: return self.states[player_idx] class TrainProxy: def __init__(self, loop_thread: AsyncLoopThread, address: Optional[str] = None): self.loop_thread = loop_thread self.address = address self.connected = False self.last_speed = 0 self.last_sent = 0.0 self.pending_speed: Optional[int] = None self.min_command_interval = 0.12 self._hub = DuploTrainHub(address=address) if HAS_DUPLO_LIB else None def set_address(self, address: Optional[str]): self.address = address if HAS_DUPLO_LIB: self._hub = DuploTrainHub(address=address) def _submit(self, coro): if not HAS_DUPLO_LIB: return None return self.loop_thread.run(coro) def connect(self, timeout=10.0) -> Tuple[bool, str]: if self.connected: return True, "déjà connecté" if not HAS_DUPLO_LIB: self.connected = True return True, "simulation (duploController non trouvé)" try: fut = self._submit(self._hub.connect()) fut.result(timeout=timeout) self.connected = True return True, "connecté" except Exception as exc: return False, f"erreur connexion: {exc}" def disconnect(self, timeout=5.0) -> Tuple[bool, str]: if not self.connected: return True, "déjà déconnecté" if not HAS_DUPLO_LIB: self.connected = False return True, "simulation déconnectée" try: fut = self._submit(self._hub.disconnect()) fut.result(timeout=timeout) self.connected = False self.last_speed = 0 return True, "déconnecté" except Exception as exc: return False, f"erreur déconnexion: {exc}" def set_speed(self, speed: int): speed = max(-100, min(100, speed)) if speed == self.last_speed and self.pending_speed is None: return self.pending_speed = speed self.flush_speed() def flush_speed(self): if self.pending_speed is None: return now = time.time() if now - self.last_sent < self.min_command_interval: return speed = self.pending_speed self.pending_speed = None self.last_speed = speed self.last_sent = now if not self.connected: return if not HAS_DUPLO_LIB: return self._submit(self._hub.set_motor_speed(speed)) def stop(self): self.pending_speed = None self.last_speed = 0 self.last_sent = time.time() if not self.connected: return if not HAS_DUPLO_LIB: return self._submit(self._hub.stop()) def play_sound(self, sound): if not self.connected or not HAS_DUPLO_LIB: return self._submit(self._hub.play_sound(sound)) def set_light(self, color): if not self.connected or not HAS_DUPLO_LIB: return self._submit(self._hub.change_light_color(color)) class DuploGame: def __init__(self): pygame.init() pygame.display.set_caption("Duplo Arcade Controller") self.screen = pygame.display.set_mode((WIDTH, HEIGHT)) self.clock = pygame.time.Clock() self.title_font = pygame.font.SysFont("arial", 56, bold=True) self.big_font = pygame.font.SysFont("arial", 36, bold=True) self.font = pygame.font.SysFont("arial", 24) self.small_font = pygame.font.SysFont("arial", 20) self.splash_image = self._load_splash_image() self.state = GameState.SPLASH self.splash_started = time.time() self.running = True self.frame_events = [] self.input = InputManager() self.joy_config_path = Path("joystick_mappings.json") self.key_config_path = Path("keyboard_mappings.json") self.ble_loop = AsyncLoopThread() self.player_count = 1 self.trains = [TrainProxy(self.ble_loop), TrainProxy(self.ble_loop)] self.addresses = ["", ""] self.status_lines = ["appuyer ACTION 1 pour connecter", "disponible si mode 2 joueurs"] self.speed_targets = [0, 0] self.color_index = [0, 0] self.speed_adjust_interval = 0.08 self.speed_adjust_step = 4 self.last_speed_adjust = [0.0, 0.0] self.config_player = 0 self.config_slot_idx = 0 self.config_slots = ["UP", "DOWN", "LEFT", "RIGHT", "A1", "A2", "A3", "A4", "A5", "A6", "SELECT", "START"] self.awaiting_joy_button = False self.config_message = "ACTION 1 pour modifier, SELECT pour reset joueur, START pour retour menu" self.key_config_player = 0 self.key_config_slot_idx = 0 self.key_config_slots = ["UP", "DOWN", "LEFT", "RIGHT", "A1", "A2", "A3", "A4", "A5", "A6", "SELECT", "START"] self.awaiting_key_press = False self.key_config_message = "ACTION 1 pour modifier, SELECT pour reset joueur, START pour retour menu" self.discovered_trains: List[Dict[str, object]] = [] self.selected_train_index = [0, 0] self.scan_attempted = False self._load_joy_mappings() self._load_key_mappings() self.colors = [] if HAS_DUPLO_LIB: self.colors = [ DuploColor.PINK, DuploColor.PURPLE, DuploColor.BLUE, DuploColor.LIGHTBLUE, DuploColor.CYAN, DuploColor.GREEN, DuploColor.YELLOW, DuploColor.ORANGE, DuploColor.RED, DuploColor.WHITE, ] def run(self): while self.running: self._handle_events() self.input.update() if self.state == GameState.SPLASH: self._update_splash() self._draw_splash() elif self.state == GameState.MENU: self._update_menu() self._draw_menu() elif self.state == GameState.CONTROL: self._update_control() self._draw_control() elif self.state == GameState.JOY_CONFIG: self._update_joy_config() self._draw_joy_config() elif self.state == GameState.KEY_CONFIG: self._update_key_config() self._draw_key_config() pygame.display.flip() self.clock.tick(FPS) self._shutdown() def _handle_events(self): self.frame_events = pygame.event.get() for event in self.frame_events: if event.type == pygame.QUIT: self.running = False elif event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE: self.running = False elif event.type == pygame.KEYDOWN and event.key == pygame.K_c and self.state == GameState.MENU: self.state = GameState.JOY_CONFIG self.awaiting_joy_button = False self.config_message = "Config joystick: ACTION 1 pour modifier" elif event.type == pygame.KEYDOWN and event.key == pygame.K_k and self.state == GameState.MENU: self.state = GameState.KEY_CONFIG self.awaiting_key_press = False self.key_config_message = "Config clavier: ACTION 1 pour modifier" elif event.type in (pygame.JOYDEVICEADDED, pygame.JOYDEVICEREMOVED): self.input.refresh_joysticks() def _update_splash(self): elapsed = time.time() - self.splash_started p0 = self.input.state(0) p1 = self.input.state(1) if elapsed >= SPLASH_DURATION or p0.start_pressed or p1.start_pressed or p0.actions_pressed[0]: self.state = GameState.MENU def _update_menu(self): p0 = self.input.state(0) p1 = self.input.state(1) if not self.scan_attempted: self._scan_available_trains() any_up = p0.up or p1.up any_down = p0.down or p1.down if any_up: self.player_count = 1 if any_down: self.player_count = 2 if p0.left_pressed: self._cycle_selected_train(0, -1) if p0.right_pressed: self._cycle_selected_train(0, +1) if self.player_count == 2 and p1.left_pressed: self._cycle_selected_train(1, -1) if self.player_count == 2 and p1.right_pressed: self._cycle_selected_train(1, +1) if p0.actions_pressed[0]: self._connect_selected_or_toggle(0) if self.player_count == 2 and p1.actions_pressed[0]: self._connect_selected_or_toggle(1) if p0.start_pressed: required = 1 if self.player_count == 1 else 2 connected = sum(1 for i in range(self.player_count) if self.trains[i].connected) if connected >= required: self.state = GameState.CONTROL self.status_lines[0] = "contrôle actif" if self.player_count == 2: self.status_lines[1] = "contrôle actif" else: self.status_lines[0] = "connectez les trains requis avant START" if p0.select_pressed: self.state = GameState.JOY_CONFIG self.awaiting_joy_button = False self.config_message = "Config joystick: ACTION 1 pour modifier" if p0.actions_pressed[1]: self.state = GameState.KEY_CONFIG self.awaiting_key_press = False self.key_config_message = "Config clavier: ACTION 1 pour modifier" async def _discover_duplo_trains_async(self) -> List[Dict[str, object]]: if not HAS_BLEAK: return [] found = await BleakScanner.discover(timeout=4.0, return_adv=True) trains = [] for _, payload in found.items(): device, advertisement = payload manufacturer_data = getattr(advertisement, "manufacturer_data", {}) or {} lego_data = manufacturer_data.get(0x0397) if not isinstance(lego_data, (bytes, bytearray)): continue is_duplo = False if len(lego_data) >= 2 and lego_data[1] == 0x20: is_duplo = True elif len(lego_data) >= 1 and lego_data[0] == 0x20: is_duplo = True if not is_duplo: continue name = device.name or getattr(advertisement, "local_name", None) or "DUPLO Hub" trains.append( { "address": device.address, "name": name, "rssi": getattr(device, "rssi", None), } ) uniq = {} for train in trains: uniq[train["address"]] = train result = list(uniq.values()) result.sort(key=lambda item: item.get("rssi") if isinstance(item.get("rssi"), int) else -999, reverse=True) return result def _scan_available_trains(self): self.scan_attempted = True if not HAS_BLEAK: self.status_lines[0] = "BLE scanner indisponible (bleak introuvable)" return try: fut = self.ble_loop.run(self._discover_duplo_trains_async()) self.discovered_trains = fut.result(timeout=8.0) self.selected_train_index = [0, 0] if self.discovered_trains: self.status_lines[0] = f"{len(self.discovered_trains)} train(s) trouvé(s)" if self.player_count == 2 and len(self.discovered_trains) > 1: self.selected_train_index[1] = 1 else: self.status_lines[0] = "aucun train trouvé (vérifier alimentation/BLE)" except Exception as exc: self.discovered_trains = [] self.status_lines[0] = f"erreur scan BLE: {exc}" def _cycle_selected_train(self, player: int, delta: int): if not self.discovered_trains: return count = len(self.discovered_trains) self.selected_train_index[player] = (self.selected_train_index[player] + delta) % count def _selected_train_for_player(self, player: int) -> Optional[Dict[str, object]]: if not self.discovered_trains: return None idx = self.selected_train_index[player] % len(self.discovered_trains) return self.discovered_trains[idx] def _connect_selected_or_toggle(self, player: int): if player == 1 and self.player_count == 1: return train = self.trains[player] if train.connected: ok, msg = train.disconnect() self.status_lines[player] = ("OK: " if ok else "KO: ") + msg return if not self.discovered_trains: self._scan_available_trains() if not self.discovered_trains: self.status_lines[player] = "KO: aucun train disponible" return chosen = self._selected_train_for_player(player) if chosen is None: self.status_lines[player] = "KO: sélection invalide" return chosen_addr = chosen["address"] other = 1 - player if self.trains[other].connected and self.trains[other].address == chosen_addr: self.status_lines[player] = "KO: ce train est déjà utilisé par l'autre joueur" return train.set_address(chosen_addr) ok, msg = train.connect() train_name = chosen.get("name", "DUPLO Hub") self.status_lines[player] = ("OK: " if ok else "KO: ") + f"{train_name} {msg}" def _toggle_connect(self, player: int): if player == 1 and self.player_count == 1: return train = self.trains[player] if train.connected: ok, msg = train.disconnect() else: ok, msg = train.connect() self.status_lines[player] = ("OK: " if ok else "KO: ") + msg def _sound_for_action(self, idx: int): if not HAS_DUPLO_LIB: return None sounds = [ DuploSound.HORN, DuploSound.BRAKE, DuploSound.STEAM, DuploSound.STATION_DEPARTURE, DuploSound.WATER_REFILL, None, ] return sounds[idx] def _update_control(self): now = time.time() for player in range(self.player_count): state = self.input.state(player) train = self.trains[player] delta = 0 if state.up: delta += 2 if state.down: delta -= 2 if delta and now - self.last_speed_adjust[player] >= self.speed_adjust_interval: step = self.speed_adjust_step if delta > 0 else -self.speed_adjust_step self.speed_targets[player] = max(-100, min(100, self.speed_targets[player] + step)) train.set_speed(self.speed_targets[player]) self.last_speed_adjust[player] = now train.flush_speed() if state.actions_pressed[5]: self.speed_targets[player] = 0 train.stop() for action_idx in range(5): if state.actions_pressed[action_idx]: sound = self._sound_for_action(action_idx) if sound is not None: train.play_sound(sound) if state.left_pressed and self.colors: self.color_index[player] = (self.color_index[player] - 1) % len(self.colors) train.set_light(self.colors[self.color_index[player]]) if state.right_pressed and self.colors: self.color_index[player] = (self.color_index[player] + 1) % len(self.colors) train.set_light(self.colors[self.color_index[player]]) if state.select_pressed: self.speed_targets[player] = 0 train.stop() if state.start_pressed: self.state = GameState.MENU self.status_lines[player] = "retour menu" def _update_joy_config(self): p0 = self.input.state(0) if p0.start_pressed and not self.awaiting_joy_button: self.state = GameState.MENU self.config_message = "Retour menu" return if not self.awaiting_joy_button: if p0.left: self.config_player = 0 if p0.right: self.config_player = 1 if p0.up: self.config_slot_idx = (self.config_slot_idx - 1) % len(self.config_slots) if p0.down: self.config_slot_idx = (self.config_slot_idx + 1) % len(self.config_slots) if p0.actions_pressed[0]: self.awaiting_joy_button = True self.config_message = f"J{self.config_player + 1} {self.config_slots[self.config_slot_idx]}: appuyez sur un bouton joystick" elif p0.select_pressed: self.input.reset_joy_bindings(self.config_player) self._save_joy_mappings() self.config_message = f"J{self.config_player + 1}: mapping joystick remis par défaut" return if p0.select_pressed: self.awaiting_joy_button = False self.config_message = "Capture annulée" return expected_player = self.config_player slot = self.config_slots[self.config_slot_idx] for event in self.frame_events: if event.type != pygame.JOYBUTTONDOWN: continue event_player = self.input.player_for_instance_id(event.instance_id) if event_player is None: continue if event_player != expected_player: self.config_message = f"Bouton ignoré: joystick joueur {event_player + 1}, attendu joueur {expected_player + 1}" continue self.input.set_joy_binding(expected_player, slot, event.button) self._save_joy_mappings() self.awaiting_joy_button = False self.config_message = f"J{expected_player + 1} {slot} = bouton {event.button}" break def _update_key_config(self): p0 = self.input.state(0) if p0.start_pressed and not self.awaiting_key_press: self.state = GameState.MENU self.key_config_message = "Retour menu" return if not self.awaiting_key_press: if p0.left: self.key_config_player = 0 if p0.right: self.key_config_player = 1 if p0.up: self.key_config_slot_idx = (self.key_config_slot_idx - 1) % len(self.key_config_slots) if p0.down: self.key_config_slot_idx = (self.key_config_slot_idx + 1) % len(self.key_config_slots) if p0.actions_pressed[0]: self.awaiting_key_press = True slot = self.key_config_slots[self.key_config_slot_idx] self.key_config_message = f"J{self.key_config_player + 1} {slot}: appuyez sur une touche clavier" elif p0.select_pressed: self.input.reset_key_bindings(self.key_config_player) self._save_key_mappings() self.key_config_message = f"J{self.key_config_player + 1}: mapping clavier remis par défaut" return if p0.select_pressed: self.awaiting_key_press = False self.key_config_message = "Capture annulée" return expected_player = self.key_config_player slot = self.key_config_slots[self.key_config_slot_idx] for event in self.frame_events: if event.type != pygame.KEYDOWN: continue if event.key == pygame.K_ESCAPE: continue self.input.set_key_binding(expected_player, slot, event.key) self._save_key_mappings() self.awaiting_key_press = False key_name = pygame.key.name(event.key) self.key_config_message = f"J{expected_player + 1} {slot} = {key_name}" break def _save_joy_mappings(self): try: payload = self.input.export_joy_bindings() with self.joy_config_path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2) except Exception as exc: self.config_message = f"Erreur sauvegarde mappings: {exc}" def _load_joy_mappings(self): if not self.joy_config_path.exists(): return try: with self.joy_config_path.open("r", encoding="utf-8") as handle: data = json.load(handle) if isinstance(data, dict): self.input.import_joy_bindings(data) self.config_message = "Mappings joystick chargés depuis joystick_mappings.json" except Exception as exc: self.config_message = f"Erreur chargement mappings: {exc}" def _save_key_mappings(self): try: payload = self.input.export_key_bindings() with self.key_config_path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2) except Exception as exc: self.key_config_message = f"Erreur sauvegarde mappings clavier: {exc}" def _load_key_mappings(self): if not self.key_config_path.exists(): return try: with self.key_config_path.open("r", encoding="utf-8") as handle: data = json.load(handle) if isinstance(data, dict): self.input.import_key_bindings(data) self.key_config_message = "Mappings clavier chargés depuis keyboard_mappings.json" except Exception as exc: self.key_config_message = f"Erreur chargement mappings clavier: {exc}" def _draw_text(self, surface, text, font, color, x, y, center=False): s = font.render(text, True, color) r = s.get_rect() if center: r.center = (x, y) else: r.topleft = (x, y) surface.blit(s, r) def _load_splash_image(self): image_path = Path("img") / "train.png" if not image_path.exists(): return None try: image = pygame.image.load(image_path.as_posix()).convert_alpha() w, h = image.get_size() max_w = int(WIDTH * 0.55) max_h = int(HEIGHT * 0.45) scale = min(max_w / w, max_h / h) if scale < 1: image = pygame.transform.smoothscale(image, (int(w * scale), int(h * scale))) return image except Exception: return None def _draw_splash(self): self.screen.fill(BG) y_title = HEIGHT // 2 - 40 if self.splash_image is not None: img_rect = self.splash_image.get_rect(center=(WIDTH // 2, HEIGHT // 2 - 110)) self.screen.blit(self.splash_image, img_rect) y_title = HEIGHT // 2 + 120 self._draw_text(self.screen, "DUPLO ARCADE", self.title_font, TXT, WIDTH // 2, y_title, center=True) self._draw_text( self.screen, "Contrôle clavier/joystick - Appuyer START ou attendre", self.font, WARN, WIDTH // 2, y_title + 64, center=True, ) if not HAS_DUPLO_LIB: self._draw_text( self.screen, "duploController non trouvé: mode simulation actif", self.small_font, BAD, WIDTH // 2, y_title + 110, center=True, ) def _draw_menu(self): self.screen.fill(BG) self._draw_text(self.screen, "Connexion des trains", self.big_font, TXT, 40, 24) self._draw_text(self.screen, "UP=1 joueur | DOWN=2 joueurs", self.font, WARN, 40, 76) self._draw_text(self.screen, f"Mode actuel: {self.player_count} joueur(s)", self.font, TXT, 40, 110) self._draw_text( self.screen, "ACTION 1 sur chaque joueur: connecter/déconnecter | START P1: lancer", self.small_font, TXT, 40, 146, ) self._draw_text(self.screen, "SELECT P1 ou touche C: config joystick", self.small_font, WARN, 40, 172) self._draw_text(self.screen, "ACTION 2 P1 ou touche K: config clavier", self.small_font, WARN, 40, 198) self._draw_text(self.screen, "LEFT/RIGHT joueur: choisir train trouvé | ACTION 1: connecter", self.small_font, TXT, 40, 224) if self.discovered_trains: preview = [] for idx, train in enumerate(self.discovered_trains[:3]): mark = "*" if idx in [self.selected_train_index[0], self.selected_train_index[1]] else " " name = train.get("name", "DUPLO Hub") addr = train.get("address", "?") preview.append(f"{mark}{idx + 1}: {name} [{addr}]") self._draw_text(self.screen, " | ".join(preview), self.small_font, GOOD, 40, 252) else: self._draw_text(self.screen, "Scan BLE en cours ou aucun train trouvé", self.small_font, BAD, 40, 252) panel_w = (WIDTH - 80 - 20) // 2 for i in [0, 1]: x = 40 + i * (panel_w + 20) y = 280 active = i < self.player_count rect = pygame.Rect(x, y, panel_w, 300) pygame.draw.rect(self.screen, PANEL_BG, rect, border_radius=10) pygame.draw.rect(self.screen, PANEL_BORDER, rect, 2, border_radius=10) title = f"Train Joueur {i + 1}" self._draw_text(self.screen, title, self.font, TXT if active else (130, 130, 130), x + 16, y + 16) if not active: self._draw_text(self.screen, "Inactif (mode 1 joueur)", self.small_font, WARN, x + 16, y + 58) continue status = "Connecté" if self.trains[i].connected else "Déconnecté" status_color = GOOD if self.trains[i].connected else BAD self._draw_text(self.screen, f"État: {status}", self.font, status_color, x + 16, y + 58) self._draw_text(self.screen, self.status_lines[i], self.small_font, TXT, x + 16, y + 98) chosen = self._selected_train_for_player(i) if chosen is not None: chosen_name = chosen.get("name", "DUPLO Hub") chosen_addr = chosen.get("address", "?") self._draw_text(self.screen, f"Sélection: {chosen_name}", self.small_font, WARN, x + 16, y + 126) self._draw_text(self.screen, str(chosen_addr), self.small_font, WARN, x + 16, y + 146) else: self._draw_text(self.screen, "Sélection: aucun train", self.small_font, WARN, x + 16, y + 126) ctrl_help = [ "Directions: vitesse (+/-) et lumière (gauche/droite)", "Action1..5: sons train", "Action6 / Select: stop immédiat", "Start: retour menu en jeu", ] for n, line in enumerate(ctrl_help): self._draw_text(self.screen, line, self.small_font, (190, 203, 219), x + 16, y + 178 + 24 * n) def _draw_joy_config(self): self.screen.fill(BG) self._draw_text(self.screen, "Configuration Joystick", self.big_font, TXT, 40, 24) self._draw_text(self.screen, "LEFT/RIGHT: joueur | UP/DOWN: fonction | ACTION 1: remapper", self.small_font, WARN, 40, 78) self._draw_text(self.screen, "SELECT: reset joueur | START: retour menu", self.small_font, WARN, 40, 104) panel_w = (WIDTH - 80 - 20) // 2 for player in [0, 1]: x = 40 + player * (panel_w + 20) y = 150 rect = pygame.Rect(x, y, panel_w, 460) pygame.draw.rect(self.screen, PANEL_BG, rect, border_radius=12) pygame.draw.rect(self.screen, PANEL_BORDER, rect, 2, border_radius=12) selected_player = player == self.config_player color = GOOD if selected_player else TXT self._draw_text(self.screen, f"Joueur {player + 1}", self.font, color, x + 16, y + 14) joy = self.input._joy_for_player(player) if joy is None: self._draw_text(self.screen, "Aucun joystick détecté pour ce joueur", self.small_font, BAD, x + 16, y + 46) else: name = joy.get_name() self._draw_text(self.screen, f"Joystick: {name}", self.small_font, TXT, x + 16, y + 46) mapping = self.input.get_joy_bindings(player) rows = [ ("UP", mapping.get("up", -1)), ("DOWN", mapping.get("down", -1)), ("LEFT", mapping.get("left", -1)), ("RIGHT", mapping.get("right", -1)), ("A1", mapping["actions"][0]), ("A2", mapping["actions"][1]), ("A3", mapping["actions"][2]), ("A4", mapping["actions"][3]), ("A5", mapping["actions"][4]), ("A6", mapping["actions"][5]), ("SELECT", mapping["select"]), ("START", mapping["start"]), ] for idx, (slot, button) in enumerate(rows): line_y = y + 80 + idx * 30 is_selected_slot = selected_player and idx == self.config_slot_idx line_color = WARN if is_selected_slot else TXT prefix = "> " if is_selected_slot else " " button_label = "désactivé" if button < 0 else f"bouton {button}" self._draw_text(self.screen, f"{prefix}{slot}", self.small_font, line_color, x + 16, line_y) self._draw_text(self.screen, button_label, self.small_font, line_color, x + 150, line_y) msg_color = BAD if self.awaiting_joy_button else TXT self._draw_text(self.screen, self.config_message, self.small_font, msg_color, 40, HEIGHT - 46) def _draw_key_config(self): self.screen.fill(BG) self._draw_text(self.screen, "Configuration Clavier", self.big_font, TXT, 40, 24) self._draw_text(self.screen, "LEFT/RIGHT: joueur | UP/DOWN: fonction | ACTION 1: remapper", self.small_font, WARN, 40, 78) self._draw_text(self.screen, "SELECT: reset joueur | START: retour menu", self.small_font, WARN, 40, 104) panel_w = (WIDTH - 80 - 20) // 2 for player in [0, 1]: x = 40 + player * (panel_w + 20) y = 150 rect = pygame.Rect(x, y, panel_w, 460) pygame.draw.rect(self.screen, PANEL_BG, rect, border_radius=12) pygame.draw.rect(self.screen, PANEL_BORDER, rect, 2, border_radius=12) selected_player = player == self.key_config_player color = GOOD if selected_player else TXT self._draw_text(self.screen, f"Joueur {player + 1}", self.font, color, x + 16, y + 14) rows = self.input.get_key_binding_rows(player) for idx, (slot, key_value) in enumerate(rows): line_y = y + 60 + idx * 32 is_selected_slot = selected_player and idx == self.key_config_slot_idx line_color = WARN if is_selected_slot else TXT prefix = "> " if is_selected_slot else " " key_name = pygame.key.name(key_value) self._draw_text(self.screen, f"{prefix}{slot}", self.small_font, line_color, x + 16, line_y) self._draw_text(self.screen, key_name, self.small_font, line_color, x + 170, line_y) msg_color = BAD if self.awaiting_key_press else TXT self._draw_text(self.screen, self.key_config_message, self.small_font, msg_color, 40, HEIGHT - 46) def _draw_control(self): self.screen.fill(BG) panel_w = (WIDTH - 80 - 20) // 2 for i in [0, 1]: x = 40 + i * (panel_w + 20) y = 40 h = HEIGHT - 80 rect = pygame.Rect(x, y, panel_w, h) pygame.draw.rect(self.screen, PANEL_BG, rect, border_radius=12) pygame.draw.rect(self.screen, PANEL_BORDER, rect, 2, border_radius=12) active = i < self.player_count title_color = TXT if active else (130, 130, 130) self._draw_text(self.screen, f"Joueur {i + 1}", self.big_font, title_color, x + 20, y + 20) if not active: self._draw_text(self.screen, "Zone inactive", self.font, WARN, x + 20, y + 90) continue connected = self.trains[i].connected self._draw_text( self.screen, f"Train: {'connecté' if connected else 'déconnecté'}", self.font, GOOD if connected else BAD, x + 20, y + 90, ) speed = self.speed_targets[i] self._draw_text(self.screen, f"Vitesse cible: {speed}", self.font, TXT, x + 20, y + 130) bar_w = panel_w - 40 bar_h = 28 bar_x = x + 20 bar_y = y + 170 pygame.draw.rect(self.screen, (40, 51, 68), (bar_x, bar_y, bar_w, bar_h), border_radius=6) center_x = bar_x + bar_w // 2 pygame.draw.line(self.screen, (80, 95, 115), (center_x, bar_y), (center_x, bar_y + bar_h), 2) fill = int((abs(speed) / 100.0) * (bar_w // 2)) if speed >= 0: pygame.draw.rect(self.screen, GOOD, (center_x, bar_y + 4, fill, bar_h - 8), border_radius=5) else: pygame.draw.rect(self.screen, BAD, (center_x - fill, bar_y + 4, fill, bar_h - 8), border_radius=5) self._draw_text(self.screen, "UP/DOWN: accélérer/freiner", self.small_font, TXT, x + 20, y + 240) self._draw_text(self.screen, "LEFT/RIGHT: changer lumière", self.small_font, TXT, x + 20, y + 270) self._draw_text(self.screen, "A1..A5: sons | A6/Select: stop", self.small_font, TXT, x + 20, y + 300) self._draw_text(self.screen, "Start: retour menu", self.small_font, TXT, x + 20, y + 330) self._draw_text(self.screen, "ESC pour quitter", self.small_font, WARN, WIDTH - 170, HEIGHT - 35) def _shutdown(self): for train in self.trains: train.disconnect(timeout=2.0) self.ble_loop.stop() pygame.quit() if __name__ == "__main__": game = DuploGame() game.run()