from __future__ import annotations import logging from pathlib import Path from typing import Dict from fastapi import Depends, FastAPI, HTTPException, status from .audio_player import AudioPlayer from .config_store import ConfigStore from .models import TriggerConfig, TriggerPatch from .security import make_auth_dependency from .serial_listener import SerialTriggerListener, TriggerLogInfo logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s - %(message)s") BASE_DIR = Path(__file__).resolve().parents[1] DATA_DIR = BASE_DIR / "data" MUSIC_DIR = DATA_DIR / "musiques" CONF_PATH = DATA_DIR / "conf.json" config_store = ConfigStore(CONF_PATH) audio_player = AudioPlayer(MUSIC_DIR) serial_listener: SerialTriggerListener | None = None app = FastAPI(title="pySonnerie Backend", version="1.0.0") def _current_config() -> Dict[str, object]: return config_store.get_copy() auth_required = make_auth_dependency(_current_config) def _resolve_trigger(config: Dict[str, object], trigger_id: str) -> tuple[str, Dict[str, object]]: triggers = config.get("triggers", {}) if not isinstance(triggers, dict): raise KeyError(trigger_id) trigger = triggers.get(trigger_id) if isinstance(trigger, dict): return trigger_id, trigger # Accept serial payload mapped to trigger "type" in config when key differs. matches = [ (key, item) for key, item in triggers.items() if isinstance(item, dict) and item.get("type") == trigger_id ] if len(matches) == 1: return matches[0] if len(matches) > 1: raise ValueError(f"Ambiguous trigger mapping for {trigger_id}") raise KeyError(trigger_id) def _play_from_trigger(trigger_id: str) -> Dict[str, object]: config = config_store.get_copy() _resolved_key, trigger = _resolve_trigger(config, trigger_id) audio_player.play( music_file=trigger["music_file"], start_seconds=float(trigger.get("start_seconds", 0.0) or 0.0), end_seconds=trigger.get("end_seconds"), volume=int(trigger.get("volume", 80)), fade_in_seconds=float(trigger.get("fade_in_seconds", 0.0) or 0.0), fade_out_seconds=float(trigger.get("fade_out_seconds", 0.0) or 0.0), normalize_audio=bool(trigger.get("normalize_audio", False)), ) return trigger def _normalize_text(value: object) -> str: return str(value or "").strip().lower() def _ensure_unique_trigger_identity( triggers: Dict[str, object], current_id: str, candidate_type: str, candidate_name: str, ) -> None: for existing_id, existing_trigger in triggers.items(): if existing_id == current_id or not isinstance(existing_trigger, dict): continue if existing_trigger.get("type") == candidate_type: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Trigger type already used") if _normalize_text(existing_trigger.get("name")) == _normalize_text(candidate_name): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Trigger name already used") @app.on_event("startup") def startup_event() -> None: global serial_listener DATA_DIR.mkdir(parents=True, exist_ok=True) MUSIC_DIR.mkdir(parents=True, exist_ok=True) if not CONF_PATH.exists(): raise RuntimeError(f"Missing config file: {CONF_PATH}") serial_conf = _current_config().get("serial", {}) serial_listener = SerialTriggerListener(serial_conf, _serial_callback) serial_listener.start() @app.on_event("shutdown") def shutdown_event() -> None: audio_player.stop() if serial_listener: serial_listener.stop() def _serial_callback(raw_message: str) -> TriggerLogInfo | None: trigger_id = raw_message.strip() try: config = config_store.get_copy() trigger_key, trigger = _resolve_trigger(config, trigger_id) audio_player.play( music_file=trigger["music_file"], start_seconds=float(trigger.get("start_seconds", 0.0) or 0.0), end_seconds=trigger.get("end_seconds"), volume=int(trigger.get("volume", 80)), fade_in_seconds=float(trigger.get("fade_in_seconds", 0.0) or 0.0), fade_out_seconds=float(trigger.get("fade_out_seconds", 0.0) or 0.0), normalize_audio=bool(trigger.get("normalize_audio", False)), ) info: TriggerLogInfo = {"key": trigger_key} name = trigger.get("name") if isinstance(name, str) and name.strip(): info["name"] = name return info except Exception as exc: # noqa: BLE001 logging.getLogger("pysonnerie.api").warning("Trigger %s failed: %s", trigger_id, exc) return None @app.get("/api/health") def health_check() -> Dict[str, str]: return {"status": "ok"} @app.get("/api/config", dependencies=[Depends(auth_required)]) def get_config() -> Dict[str, object]: return config_store.get_copy() @app.get("/api/triggers", dependencies=[Depends(auth_required)]) def list_triggers() -> Dict[str, object]: config = config_store.get_copy() return config.get("triggers", {}) @app.put("/api/triggers/{trigger_id}", dependencies=[Depends(auth_required)]) def upsert_trigger(trigger_id: str, payload: TriggerConfig) -> Dict[str, object]: config = config_store.get_copy() triggers = config.get("triggers", {}) if not isinstance(triggers, dict): triggers = {} _ensure_unique_trigger_identity(triggers, trigger_id, payload.type, payload.name) return config_store.upsert_trigger(trigger_id, payload.model_dump()) @app.patch("/api/triggers/{trigger_id}", dependencies=[Depends(auth_required)]) def patch_trigger(trigger_id: str, payload: TriggerPatch) -> Dict[str, object]: updates = {key: value for key, value in payload.model_dump().items() if value is not None} if not updates: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No fields to update") existing = config_store.get_copy().get("triggers", {}).get(trigger_id) if not existing: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown trigger") candidate = {**existing, **updates} TriggerConfig.model_validate(candidate) config = config_store.get_copy() triggers = config.get("triggers", {}) if not isinstance(triggers, dict): triggers = {} _ensure_unique_trigger_identity( triggers, trigger_id, str(candidate.get("type", "")), str(candidate.get("name", "")), ) try: return config_store.patch_trigger(trigger_id, updates) except KeyError: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown trigger") @app.delete("/api/triggers/{trigger_id}", dependencies=[Depends(auth_required)]) def delete_trigger(trigger_id: str) -> Dict[str, str]: try: config_store.delete_trigger(trigger_id) except KeyError: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown trigger") return {"status": "deleted"} @app.get("/api/play/{trigger_id}", dependencies=[Depends(auth_required)]) def force_play(trigger_id: str) -> Dict[str, str]: try: _play_from_trigger(trigger_id) except KeyError: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown trigger") except (FileNotFoundError, ValueError, RuntimeError) as exc: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) return {"status": "playing"} @app.get("/api/stop", dependencies=[Depends(auth_required)]) def stop_audio() -> Dict[str, str]: audio_player.stop() return {"status": "stopped"}