from __future__ import annotations import re import shutil import subprocess import threading import uuid from pathlib import Path from flask import ( Blueprint, Response, current_app, flash, jsonify, redirect, render_template, request, send_file, session, url_for, ) from werkzeug.utils import secure_filename from .backend_client import BackendApiError, BackendClient, read_default_backend_url ui = Blueprint("ui", __name__, template_folder="templates") GPIO_PATTERN = re.compile(r"^GPIO\d+$") ALLOWED_AUDIO_EXTENSIONS = {".mp3", ".wav", ".ogg", ".flac", ".aac", ".m4a"} YOUTUBE_PROGRESS_PATTERN = re.compile(r"(\d+(?:\.\d+)?)%") YOUTUBE_POSTPROCESS_HINTS = ( "extractaudio", "ffmpeg", "postprocess", "post-process", "merging", "deleting original file", "destination", ) INVALID_FILENAME_CHARS_PATTERN = re.compile(r"[<>:\"/\\|?*\x00-\x1f]") _YOUTUBE_JOBS_LOCK = threading.Lock() _YOUTUBE_JOBS: dict[str, dict[str, object]] = {} def _backend_client() -> BackendClient | None: backend_url = session.get("backend_url") username = session.get("backend_username") password = session.get("backend_password") if not backend_url or not username or not password: return None return BackendClient(str(backend_url), str(username), str(password)) def _ensure_login() -> BackendClient | Response: client = _backend_client() if client is None: flash("Veuillez vous connecter d'abord.", "error") return redirect(url_for("ui.login")) return client def _music_dir() -> Path: music_dir = current_app.config["MUSIC_DIR"] music_dir.mkdir(parents=True, exist_ok=True) return music_dir def _list_audio_files() -> list[str]: files: list[str] = [] for item in _music_dir().iterdir(): if item.is_file() and item.suffix.lower() in ALLOWED_AUDIO_EXTENSIONS: files.append(item.name) return sorted(files, key=str.lower) def _used_audio_files_from_triggers(triggers: dict[str, dict]) -> set[str]: used: set[str] = set() for trigger in triggers.values(): if not isinstance(trigger, dict): continue music_file = str(trigger.get("music_file", "")).strip() if music_file: used.add(Path(music_file).name.lower()) return used def _parse_optional_float(value: str) -> float | None: clean = value.strip() if clean == "": return None return float(clean) def _next_numeric_trigger_id(triggers: dict[str, dict]) -> str: max_id = 0 for key in triggers: if key.isdigit(): max_id = max(max_id, int(key)) return str(max_id + 1) def _normalize_key(value: str) -> str: return value.strip().lower() def _audio_redirect_target() -> str: if request.form.get("return_to", "").strip() == "audio_storage": return url_for("ui.audio_storage") return url_for("ui.dashboard") def _normalize_youtube_filename(raw_name: str) -> str | None: # Keep readable names (including spaces) but strip dangerous/path chars. candidate = Path(raw_name.strip()).name if candidate == "": return None stem = Path(candidate).stem stem = INVALID_FILENAME_CHARS_PATTERN.sub(" ", stem) stem = re.sub(r"\s+", " ", stem).strip(" .") if stem == "": return None return f"{stem}.mp3" def _next_available_filename(filename: str, existing_names: set[str]) -> str: candidate = filename stem = Path(filename).stem suffix = Path(filename).suffix or ".mp3" index = 2 while candidate.lower() in existing_names: candidate = f"{stem}_{index}{suffix}" index += 1 return candidate def _build_youtube_download_command(youtube_url: str, requested_name: str, music_dir: Path) -> tuple[list[str] | None, str | None]: yt_dlp_path = shutil.which("yt-dlp") if yt_dlp_path is None: return None, "yt-dlp est introuvable sur le système." if shutil.which("node") is None: return None, "node est requis pour l'option --js-runtimes node." command = [ yt_dlp_path, "--js-runtimes", "node", "--remote-components", "ejs:github", "--newline", "-x", "--audio-format", "mp3", ] if requested_name: normalized_name = _normalize_youtube_filename(requested_name) if normalized_name is None: return None, "Nom de fichier YouTube invalide." existing_names = {item.name.lower() for item in music_dir.iterdir() if item.is_file()} if normalized_name.lower() in existing_names: return None, "Un fichier avec ce nom existe déjà." output_template = music_dir / f"{Path(normalized_name).stem}.%(ext)s" command.extend(["-o", str(output_template)]) else: command.extend(["-P", str(music_dir)]) command.append(youtube_url) return command, None def _extract_progress_percent(line: str) -> float | None: match = YOUTUBE_PROGRESS_PATTERN.search(line) if match is None: return None try: value = float(match.group(1)) except ValueError: return None return max(0.0, min(100.0, value)) def _set_youtube_job_state(job_id: str, **updates: object) -> None: with _YOUTUBE_JOBS_LOCK: current = _YOUTUBE_JOBS.get(job_id) if current is None: return current.update(updates) def _create_youtube_job() -> str: job_id = uuid.uuid4().hex with _YOUTUBE_JOBS_LOCK: _YOUTUBE_JOBS[job_id] = { "status": "running", "percent": 0.0, "message": "Préparation du téléchargement...", } return job_id def _run_youtube_download_job(job_id: str, command: list[str]) -> None: try: process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) except OSError as exc: _set_youtube_job_state(job_id, status="error", message=f"Impossible d'exécuter yt-dlp: {exc}") return last_line = "" max_percent = 0.0 in_postprocess = False output = process.stdout if output is not None: for raw_line in output: line = raw_line.strip() if line == "": continue last_line = line lower_line = line.lower() if any(hint in lower_line for hint in YOUTUBE_POSTPROCESS_HINTS): in_postprocess = True shown_percent = max(max_percent, 99.0) _set_youtube_job_state( job_id, status="running", percent=shown_percent, message="Conversion audio en cours...", ) progress = _extract_progress_percent(line) if progress is None: continue if progress > max_percent: max_percent = progress shown_percent = min(max_percent, 99.0) if shown_percent >= 99.0: in_postprocess = True message = ( "Conversion audio en cours..." if in_postprocess else f"Téléchargement en cours: {shown_percent:.1f}%" ) _set_youtube_job_state( job_id, status="running", percent=shown_percent, message=message, ) return_code = process.wait() if return_code == 0: _set_youtube_job_state( job_id, status="completed", percent=100.0, message="Audio YouTube extrait et ajouté au stockage.", ) return detail = last_line if last_line else "Erreur inconnue" _set_youtube_job_state(job_id, status="error", message=f"Échec extraction YouTube: {detail}") def _trigger_sort_key(item: tuple[str, dict]) -> tuple[int, int | str]: trigger_id = item[0] if trigger_id.isdigit(): return (0, int(trigger_id)) return (1, trigger_id.lower()) @ui.get("/") def home() -> Response: if _backend_client() is None: return redirect(url_for("ui.login")) return redirect(url_for("ui.dashboard")) @ui.route("/login", methods=["GET", "POST"]) def login() -> str | Response: project_root: Path = current_app.config["PROJECT_ROOT"] default_url = read_default_backend_url(project_root) if request.method == "POST": backend_url = default_url username = request.form.get("username", "").strip() password = request.form.get("password", "") session["backend_url"] = backend_url session["backend_username"] = username session["backend_password"] = password client = _backend_client() assert client is not None try: client.health() client.list_triggers() except BackendApiError: return render_template( "login.html", login_error="Impossible de se connecter au service pour le moment. Vérifiez que le backend est demarré puis réessayez.", attempted_username=username, ) flash("Connexion au backend reussie.", "success") return redirect(url_for("ui.dashboard")) return render_template("login.html", login_error=None, attempted_username="") @ui.get("/logout") def logout() -> Response: session.clear() flash("Session fermee.", "info") return redirect(url_for("ui.login")) @ui.get("/dashboard") def dashboard() -> str | Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): return client_or_redirect client = client_or_redirect triggers: dict[str, dict] = {} used_audio_files: set[str] = set() try: raw = client.list_triggers() valid_triggers = {k: v for k, v in raw.items() if isinstance(v, dict)} triggers = dict(sorted(valid_triggers.items(), key=_trigger_sort_key)) used_audio_files = _used_audio_files_from_triggers(valid_triggers) except BackendApiError as exc: flash(f"Impossible de charger les triggers: {exc}", "error") audio_files = _list_audio_files() return render_template( "dashboard.html", triggers=triggers, audio_files=audio_files, used_audio_files=used_audio_files, backend_url=session.get("backend_url", ""), username=session.get("backend_username", ""), ) @ui.get("/audio-storage") def audio_storage() -> str | Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): return client_or_redirect client = client_or_redirect audio_files = _list_audio_files() used_audio_files: set[str] = set() try: raw = client.list_triggers() valid_triggers = {k: v for k, v in raw.items() if isinstance(v, dict)} used_audio_files = _used_audio_files_from_triggers(valid_triggers) except BackendApiError as exc: flash(f"Impossible de charger les triggers: {exc}", "error") return render_template("audio_storage.html", audio_files=audio_files, used_audio_files=used_audio_files) @ui.post("/trigger/save") def save_trigger() -> Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": False, "error": "Non connecte"}), 401 return client_or_redirect client = client_or_redirect original_id = request.form.get("original_id", "").strip() trigger_type = request.form.get("type", "").strip() name = request.form.get("name", "").strip() music_file = request.form.get("music_file", "").strip() start_raw = request.form.get("start_seconds", "0") end_raw = request.form.get("end_seconds", "") volume_raw = request.form.get("volume", "80") fade_in_raw = request.form.get("fade_in_seconds", "0") fade_out_raw = request.form.get("fade_out_seconds", "0") repeat_count_raw = request.form.get("repeat_count", "0") normalize_audio = request.form.get("normalize_audio") in {"on", "true", "1"} def _err(msg: str) -> Response: if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": False, "error": msg}), 400 flash(msg, "error") return redirect(url_for("ui.dashboard")) if not GPIO_PATTERN.match(trigger_type): return _err("Le type doit respecter le format GPIO.") if not name or not music_file: return _err("Le nom et le fichier audio sont obligatoires.") try: start_seconds = float(start_raw) end_seconds = _parse_optional_float(end_raw) volume = int(volume_raw) fade_in_seconds = float(fade_in_raw) fade_out_seconds = float(fade_out_raw) repeat_count = int(repeat_count_raw) except ValueError: return _err("Les temps de debut/fin, fondus, repetitions et le volume doivent être numériques.") if start_seconds < 0 or (end_seconds is not None and end_seconds <= start_seconds): return _err("Fenêtre temporelle invalide.") if not (0 <= volume <= 100): return _err("Le volume doit être compris entre 0 et 100.") if fade_in_seconds < 0 or fade_out_seconds < 0: return _err("Les durées de fondu doivent être supérieures ou égales à 0.") if repeat_count < 0: return _err("Le nombre de répétitions doit être supérieur ou égal à 0.") payload = { "name": name, "type": trigger_type, "music_file": music_file, "start_seconds": start_seconds, "end_seconds": end_seconds, "volume": volume, "fade_in_seconds": fade_in_seconds, "fade_out_seconds": fade_out_seconds, "repeat_count": repeat_count, "normalize_audio": normalize_audio, } try: existing_raw = client.list_triggers() existing_triggers = {k: v for k, v in existing_raw.items() if isinstance(v, dict)} if original_id and original_id in existing_triggers: trigger_id = original_id else: trigger_id = _next_numeric_trigger_id(existing_triggers) for existing_id, existing_trigger in existing_triggers.items(): if existing_id == trigger_id: continue if existing_trigger.get("type") == trigger_type: return _err("Ce type de trigger est déjà utilisé.") if _normalize_key(str(existing_trigger.get("name", ""))) == _normalize_key(name): return _err("Ce nom de trigger est déjà utilisé.") client.upsert_trigger(trigger_id, payload) except BackendApiError as exc: if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": False, "error": str(exc)}), 502 flash(f"Echec d'enregistrement du trigger: {exc}", "error") return redirect(url_for("ui.dashboard")) if request.headers.get("X-Requested-With") == "fetch": return jsonify({ "ok": True, "message": f"Trigger \"{name}\" enregistré.", "trigger_id": trigger_id, "original_id": original_id or trigger_id, "trigger": payload, }) flash(f"Trigger \"{name}\" enregistré.", "success") return redirect(url_for("ui.dashboard")) @ui.post("/trigger/delete") def delete_trigger() -> Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": False, "error": "Non connecte"}), 401 return client_or_redirect client = client_or_redirect trigger_id = request.form.get("trigger_id", "").strip() if not trigger_id: if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": False, "error": "Identifiant du trigger manquant."}), 400 flash("Identifiant du trigger manquant.", "error") return redirect(url_for("ui.dashboard")) try: client.delete_trigger(trigger_id) if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": True, "message": "Trigger supprimé.", "trigger_id": trigger_id}) flash("Trigger supprimé.", "success") except BackendApiError as exc: if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": False, "error": f"Echec de suppression: {exc}"}), 500 flash(f"Echec de suppression: {exc}", "error") return redirect(url_for("ui.dashboard")) @ui.post("/trigger/play") def play_trigger() -> Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": False, "error": "Non connecte"}), 401 return client_or_redirect client = client_or_redirect trigger_id = request.form.get("trigger_id", "").strip() if not trigger_id: if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": False, "error": "Identifiant du trigger manquant."}), 400 flash("Identifiant du trigger manquant.", "error") return redirect(url_for("ui.dashboard")) try: client.play_trigger(trigger_id) if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": True, "message": "Trigger démarré."}) flash("Trigger démarré.", "success") except BackendApiError as exc: if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": False, "error": str(exc)}), 502 flash(f"Echec du lancement: {exc}", "error") return redirect(url_for("ui.dashboard")) @ui.post("/audio/stop") def stop_audio() -> Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": False, "error": "Non connecte"}), 401 return client_or_redirect client = client_or_redirect try: client.stop_audio() if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": True, "message": "Audio arrete."}) flash("Audio arrete.", "info") except BackendApiError as exc: if request.headers.get("X-Requested-With") == "fetch": return jsonify({"ok": False, "error": str(exc)}), 502 flash(f"Echec de l'arrêt audio: {exc}", "error") return redirect(url_for("ui.dashboard")) @ui.post("/audio/upload") def upload_audio() -> Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): return client_or_redirect youtube_url = request.form.get("youtube_url", "").strip() if youtube_url: requested_name = request.form.get("youtube_filename", "").strip() command, command_error = _build_youtube_download_command(youtube_url, requested_name, _music_dir()) if command is None: flash(command_error or "Impossible de préparer la commande yt-dlp.", "error") return redirect(_audio_redirect_target()) try: result = subprocess.run(command, check=False, capture_output=True, text=True) except OSError as exc: flash(f"Impossible d'exécuter yt-dlp: {exc}", "error") return redirect(_audio_redirect_target()) if result.returncode != 0: detail = (result.stderr or result.stdout or "").strip().splitlines() message = detail[-1] if detail else "Erreur inconnue" flash(f"Échec extraction YouTube: {message}", "error") return redirect(_audio_redirect_target()) flash("Audio YouTube extrait et ajouté au stockage.", "success") return redirect(_audio_redirect_target()) audio = request.files.get("audio_file") if audio is None or audio.filename is None or audio.filename.strip() == "": flash("Sélectionnez d'abord un fichier.", "error") return redirect(_audio_redirect_target()) filename = secure_filename(audio.filename) if filename == "": flash("Nom de fichier invalide.", "error") return redirect(_audio_redirect_target()) ext = Path(filename).suffix.lower() if ext not in ALLOWED_AUDIO_EXTENSIONS: flash("Format audio non supporté.", "error") return redirect(_audio_redirect_target()) existing_names = {item.name.lower() for item in _music_dir().iterdir() if item.is_file()} if filename.lower() in existing_names: flash("Un fichier avec ce nom existe déjà.", "error") return redirect(_audio_redirect_target()) destination = _music_dir() / filename audio.save(destination) flash(f"Fichier {filename} téléversé.", "success") return redirect(_audio_redirect_target()) @ui.post("/audio/delete") def delete_audio() -> Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): return client_or_redirect filename = request.form.get("filename", "").strip() if not filename: flash("Nom de fichier manquant.", "error") return redirect(_audio_redirect_target()) target = _music_dir() / filename if not target.exists() or not target.is_file(): flash("Fichier introuvable.", "error") return redirect(_audio_redirect_target()) client = client_or_redirect try: raw = client.list_triggers() valid_triggers = {k: v for k, v in raw.items() if isinstance(v, dict)} used_audio_files = _used_audio_files_from_triggers(valid_triggers) if Path(filename).name.lower() in used_audio_files: flash("Ce fichier est associé à un trigger et ne peut pas être supprimé.", "error") return redirect(_audio_redirect_target()) except BackendApiError as exc: flash(f"Impossible de vérifier les triggers: {exc}", "error") return redirect(_audio_redirect_target()) target.unlink() flash(f"Fichier {filename} supprimé.", "success") return redirect(_audio_redirect_target()) @ui.post("/audio/youtube/proposed-name") def youtube_proposed_name() -> Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): return jsonify({"ok": False, "error": "Non connecte"}), 401 youtube_url = request.form.get("youtube_url", "").strip() if youtube_url == "": return jsonify({"ok": False, "error": "Lien YouTube manquant."}), 400 yt_dlp_path = shutil.which("yt-dlp") if yt_dlp_path is None: return jsonify({"ok": False, "error": "yt-dlp est introuvable sur le système."}), 400 if shutil.which("node") is None: return jsonify({"ok": False, "error": "node est requis pour l'option --js-runtimes node."}), 400 command = [ yt_dlp_path, "--js-runtimes", "node", "--remote-components", "ejs:github", "--no-playlist", "--skip-download", "--print", "%(title)s", youtube_url, ] try: result = subprocess.run(command, check=False, capture_output=True, text=True) except OSError as exc: return jsonify({"ok": False, "error": f"Impossible d'exécuter yt-dlp: {exc}"}), 500 if result.returncode != 0: detail = (result.stderr or result.stdout or "").strip().splitlines() message = detail[-1] if detail else "Erreur inconnue" return jsonify({"ok": False, "error": f"Échec lecture metadonnées YouTube: {message}"}), 502 title = next((line.strip() for line in result.stdout.splitlines() if line.strip()), "") proposed = _normalize_youtube_filename(title) or "audio_youtube.mp3" existing_names = {item.name.lower() for item in _music_dir().iterdir() if item.is_file()} available_name = _next_available_filename(proposed, existing_names) return jsonify({"ok": True, "filename": available_name}) @ui.post("/audio/youtube/start") def youtube_download_start() -> Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): return jsonify({"ok": False, "error": "Non connecte"}), 401 youtube_url = request.form.get("youtube_url", "").strip() if youtube_url == "": return jsonify({"ok": False, "error": "Lien YouTube manquant."}), 400 requested_name = request.form.get("youtube_filename", "").strip() music_dir = _music_dir() command, command_error = _build_youtube_download_command(youtube_url, requested_name, music_dir) if command is None: return jsonify({"ok": False, "error": command_error or "Impossible de préparer la commande yt-dlp."}), 400 job_id = _create_youtube_job() worker = threading.Thread(target=_run_youtube_download_job, args=(job_id, command), daemon=True) worker.start() return jsonify({"ok": True, "job_id": job_id}) @ui.get("/audio/youtube/status/") def youtube_download_status(job_id: str) -> Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): return jsonify({"ok": False, "error": "Non connecte"}), 401 with _YOUTUBE_JOBS_LOCK: job = _YOUTUBE_JOBS.get(job_id) if job is None: return jsonify({"ok": False, "error": "Téléchargement introuvable."}), 404 payload = { "ok": True, "status": str(job.get("status", "running")), "percent": float(job.get("percent", 0.0)), "message": str(job.get("message", "Téléchargement en cours...")), } return jsonify(payload) @ui.get("/audio/download/") def download_audio(filename: str) -> Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): return client_or_redirect safe_name = Path(filename).name target = _music_dir() / safe_name if not target.exists() or not target.is_file(): flash("Fichier introuvable.", "error") return redirect(url_for("ui.dashboard")) return send_file(target, as_attachment=True) @ui.get("/audio/stream/") def stream_audio(filename: str) -> Response: client_or_redirect = _ensure_login() if isinstance(client_or_redirect, Response): return client_or_redirect safe_name = Path(filename).name target = _music_dir() / safe_name if not target.exists() or not target.is_file(): flash("Fichier introuvable.", "error") return redirect(url_for("ui.dashboard")) return send_file(target, as_attachment=False)