You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
180 lines
5.8 KiB
180 lines
5.8 KiB
from __future__ import annotations |
|
|
|
import logging |
|
from pathlib import Path |
|
from typing import Dict |
|
|
|
from fastapi import Depends, FastAPI, HTTPException, status |
|
|
|
from .audio_player import AudioPlayer |
|
from .config_store import ConfigStore |
|
from .models import TriggerConfig, TriggerPatch |
|
from .security import make_auth_dependency |
|
from .serial_listener import SerialTriggerListener, TriggerLogInfo |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s - %(message)s") |
|
|
|
BASE_DIR = Path(__file__).resolve().parents[1] |
|
DATA_DIR = BASE_DIR / "data" |
|
MUSIC_DIR = DATA_DIR / "musiques" |
|
CONF_PATH = DATA_DIR / "conf.json" |
|
|
|
config_store = ConfigStore(CONF_PATH) |
|
audio_player = AudioPlayer(MUSIC_DIR) |
|
serial_listener: SerialTriggerListener | None = None |
|
|
|
app = FastAPI(title="pySonnerie Backend", version="1.0.0") |
|
|
|
|
|
def _current_config() -> Dict[str, object]: |
|
return config_store.get_copy() |
|
|
|
|
|
auth_required = make_auth_dependency(_current_config) |
|
|
|
|
|
def _resolve_trigger(config: Dict[str, object], trigger_id: str) -> tuple[str, Dict[str, object]]: |
|
triggers = config.get("triggers", {}) |
|
if not isinstance(triggers, dict): |
|
raise KeyError(trigger_id) |
|
|
|
trigger = triggers.get(trigger_id) |
|
if isinstance(trigger, dict): |
|
return trigger_id, trigger |
|
|
|
# Accept serial payload mapped to trigger "type" in config when key differs. |
|
matches = [ |
|
(key, item) |
|
for key, item in triggers.items() |
|
if isinstance(item, dict) and item.get("type") == trigger_id |
|
] |
|
if len(matches) == 1: |
|
return matches[0] |
|
if len(matches) > 1: |
|
raise ValueError(f"Ambiguous trigger mapping for {trigger_id}") |
|
|
|
raise KeyError(trigger_id) |
|
|
|
|
|
def _play_from_trigger(trigger_id: str) -> Dict[str, object]: |
|
config = config_store.get_copy() |
|
_resolved_key, trigger = _resolve_trigger(config, trigger_id) |
|
audio_player.play( |
|
music_file=trigger["music_file"], |
|
start_seconds=float(trigger.get("start_seconds", 0.0) or 0.0), |
|
end_seconds=trigger.get("end_seconds"), |
|
) |
|
return trigger |
|
|
|
|
|
@app.on_event("startup") |
|
def startup_event() -> None: |
|
global serial_listener |
|
|
|
DATA_DIR.mkdir(parents=True, exist_ok=True) |
|
MUSIC_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
if not CONF_PATH.exists(): |
|
raise RuntimeError(f"Missing config file: {CONF_PATH}") |
|
|
|
serial_conf = _current_config().get("serial", {}) |
|
serial_listener = SerialTriggerListener(serial_conf, _serial_callback) |
|
serial_listener.start() |
|
|
|
|
|
@app.on_event("shutdown") |
|
def shutdown_event() -> None: |
|
audio_player.stop() |
|
if serial_listener: |
|
serial_listener.stop() |
|
|
|
|
|
def _serial_callback(raw_message: str) -> TriggerLogInfo | None: |
|
trigger_id = raw_message.strip() |
|
try: |
|
config = config_store.get_copy() |
|
trigger_key, trigger = _resolve_trigger(config, trigger_id) |
|
audio_player.play( |
|
music_file=trigger["music_file"], |
|
start_seconds=float(trigger.get("start_seconds", 0.0) or 0.0), |
|
end_seconds=trigger.get("end_seconds"), |
|
) |
|
|
|
info: TriggerLogInfo = {"key": trigger_key} |
|
name = trigger.get("name") |
|
if isinstance(name, str) and name.strip(): |
|
info["name"] = name |
|
return info |
|
except Exception as exc: # noqa: BLE001 |
|
logging.getLogger("pysonnerie.api").warning("Trigger %s failed: %s", trigger_id, exc) |
|
return None |
|
|
|
|
|
@app.get("/api/health") |
|
def health_check() -> Dict[str, str]: |
|
return {"status": "ok"} |
|
|
|
|
|
@app.get("/api/config", dependencies=[Depends(auth_required)]) |
|
def get_config() -> Dict[str, object]: |
|
return config_store.get_copy() |
|
|
|
|
|
@app.get("/api/triggers", dependencies=[Depends(auth_required)]) |
|
def list_triggers() -> Dict[str, object]: |
|
config = config_store.get_copy() |
|
return config.get("triggers", {}) |
|
|
|
|
|
@app.put("/api/triggers/{trigger_id}", dependencies=[Depends(auth_required)]) |
|
def upsert_trigger(trigger_id: str, payload: TriggerConfig) -> Dict[str, object]: |
|
if payload.type != trigger_id: |
|
raise HTTPException( |
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
detail="trigger_id must match payload.type", |
|
) |
|
return config_store.upsert_trigger(trigger_id, payload.model_dump()) |
|
|
|
|
|
@app.patch("/api/triggers/{trigger_id}", dependencies=[Depends(auth_required)]) |
|
def patch_trigger(trigger_id: str, payload: TriggerPatch) -> Dict[str, object]: |
|
updates = {key: value for key, value in payload.model_dump().items() if value is not None} |
|
if not updates: |
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No fields to update") |
|
|
|
existing = config_store.get_copy().get("triggers", {}).get(trigger_id) |
|
if not existing: |
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown trigger") |
|
|
|
candidate = {**existing, **updates} |
|
TriggerConfig.model_validate(candidate) |
|
|
|
try: |
|
return config_store.patch_trigger(trigger_id, updates) |
|
except KeyError: |
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown trigger") |
|
|
|
|
|
@app.delete("/api/triggers/{trigger_id}", dependencies=[Depends(auth_required)]) |
|
def delete_trigger(trigger_id: str) -> Dict[str, str]: |
|
try: |
|
config_store.delete_trigger(trigger_id) |
|
except KeyError: |
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown trigger") |
|
return {"status": "deleted"} |
|
|
|
|
|
@app.get("/api/play/{trigger_id}", dependencies=[Depends(auth_required)]) |
|
def force_play(trigger_id: str) -> Dict[str, str]: |
|
try: |
|
_play_from_trigger(trigger_id) |
|
except KeyError: |
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown trigger") |
|
return {"status": "playing"} |
|
|
|
|
|
@app.get("/api/stop", dependencies=[Depends(auth_required)]) |
|
def stop_audio() -> Dict[str, str]: |
|
audio_player.stop() |
|
return {"status": "stopped"}
|
|
|