You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

789 lines
28 KiB

from __future__ import annotations
import json
import re
import shutil
import subprocess
import tempfile
import threading
import uuid
from pathlib import Path
from flask import (
Blueprint,
Response,
current_app,
flash,
jsonify,
redirect,
render_template,
request,
send_file,
session,
url_for,
)
from werkzeug.utils import secure_filename
from .backend_client import BackendApiError, BackendClient, read_default_backend_url
ui = Blueprint("ui", __name__, template_folder="templates")
GPIO_PATTERN = re.compile(r"^GPIO\d+$")
ALLOWED_AUDIO_EXTENSIONS = {".mp3", ".wav", ".ogg", ".flac", ".aac", ".m4a"}
YOUTUBE_PROGRESS_PATTERN = re.compile(r"(\d+(?:\.\d+)?)%")
YOUTUBE_POSTPROCESS_HINTS = (
"extractaudio",
"ffmpeg",
"postprocess",
"post-process",
"merging",
"deleting original file",
"destination",
)
INVALID_FILENAME_CHARS_PATTERN = re.compile(r"[<>:\"/\\|?*\x00-\x1f]")
YOUTUBE_JOBS_DIR = Path(tempfile.gettempdir()) / "pysonnerie-youtube-jobs"
_YOUTUBE_JOBS_LOCK = threading.Lock()
_YOUTUBE_JOBS: dict[str, dict[str, object]] = {}
def _backend_client() -> BackendClient | None:
backend_url = session.get("backend_url")
username = session.get("backend_username")
password = session.get("backend_password")
if not backend_url or not username or not password:
return None
return BackendClient(str(backend_url), str(username), str(password))
def _ensure_login() -> BackendClient | Response:
client = _backend_client()
if client is None:
flash("Veuillez vous connecter d'abord.", "error")
return redirect(url_for("ui.login"))
return client
def _music_dir() -> Path:
music_dir = current_app.config["MUSIC_DIR"]
music_dir.mkdir(parents=True, exist_ok=True)
return music_dir
def _list_audio_files() -> list[str]:
files: list[str] = []
for item in _music_dir().iterdir():
if item.is_file() and item.suffix.lower() in ALLOWED_AUDIO_EXTENSIONS:
files.append(item.name)
return sorted(files, key=str.lower)
def _used_audio_files_from_triggers(triggers: dict[str, dict]) -> set[str]:
used: set[str] = set()
for trigger in triggers.values():
if not isinstance(trigger, dict):
continue
music_file = str(trigger.get("music_file", "")).strip()
if music_file:
used.add(Path(music_file).name.lower())
return used
def _parse_optional_float(value: str) -> float | None:
clean = value.strip()
if clean == "":
return None
return float(clean)
def _next_numeric_trigger_id(triggers: dict[str, dict]) -> str:
max_id = 0
for key in triggers:
if key.isdigit():
max_id = max(max_id, int(key))
return str(max_id + 1)
def _normalize_key(value: str) -> str:
return value.strip().lower()
def _audio_redirect_target() -> str:
if request.form.get("return_to", "").strip() == "audio_storage":
return url_for("ui.audio_storage")
return url_for("ui.dashboard")
def _normalize_youtube_filename(raw_name: str) -> str | None:
# Keep readable names (including spaces) but strip dangerous/path chars.
candidate = Path(raw_name.strip()).name
if candidate == "":
return None
stem = Path(candidate).stem
stem = INVALID_FILENAME_CHARS_PATTERN.sub(" ", stem)
stem = re.sub(r"\s+", " ", stem).strip(" .")
if stem == "":
return None
return f"{stem}.mp3"
def _next_available_filename(filename: str, existing_names: set[str]) -> str:
candidate = filename
stem = Path(filename).stem
suffix = Path(filename).suffix or ".mp3"
index = 2
while candidate.lower() in existing_names:
candidate = f"{stem}_{index}{suffix}"
index += 1
return candidate
def _build_youtube_download_command(youtube_url: str, requested_name: str, music_dir: Path) -> tuple[list[str] | None, str | None]:
yt_dlp_path = shutil.which("yt-dlp")
if yt_dlp_path is None:
return None, "yt-dlp est introuvable sur le système."
if shutil.which("node") is None:
return None, "node est requis pour l'option --js-runtimes node."
command = [
yt_dlp_path,
"--js-runtimes",
"node",
"--remote-components",
"ejs:github",
"--newline",
"-x",
"--audio-format",
"mp3",
]
if requested_name:
normalized_name = _normalize_youtube_filename(requested_name)
if normalized_name is None:
return None, "Nom de fichier YouTube invalide."
existing_names = {item.name.lower() for item in music_dir.iterdir() if item.is_file()}
if normalized_name.lower() in existing_names:
return None, "Un fichier avec ce nom existe déjà."
output_template = music_dir / f"{Path(normalized_name).stem}.%(ext)s"
command.extend(["-o", str(output_template)])
else:
command.extend(["-P", str(music_dir)])
command.append(youtube_url)
return command, None
def _extract_progress_percent(line: str) -> float | None:
match = YOUTUBE_PROGRESS_PATTERN.search(line)
if match is None:
return None
try:
value = float(match.group(1))
except ValueError:
return None
return max(0.0, min(100.0, value))
def _set_youtube_job_state(job_id: str, **updates: object) -> None:
YOUTUBE_JOBS_DIR.mkdir(parents=True, exist_ok=True)
job_path = YOUTUBE_JOBS_DIR / f"{job_id}.json"
with _YOUTUBE_JOBS_LOCK:
current = _YOUTUBE_JOBS.get(job_id)
if current is None and job_path.exists():
try:
current = json.loads(job_path.read_text(encoding="utf-8"))
except Exception:
current = None
if current is None:
current = {}
current.update(updates)
_YOUTUBE_JOBS[job_id] = current
tmp_path = job_path.with_suffix(".json.tmp")
tmp_path.write_text(json.dumps(current, ensure_ascii=False), encoding="utf-8")
tmp_path.replace(job_path)
def _create_youtube_job() -> str:
job_id = uuid.uuid4().hex
YOUTUBE_JOBS_DIR.mkdir(parents=True, exist_ok=True)
payload = {
"status": "running",
"percent": 0.0,
"message": "Préparation du téléchargement...",
}
with _YOUTUBE_JOBS_LOCK:
_YOUTUBE_JOBS[job_id] = payload
job_path = YOUTUBE_JOBS_DIR / f"{job_id}.json"
tmp_path = job_path.with_suffix(".json.tmp")
tmp_path.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
tmp_path.replace(job_path)
return job_id
def _run_youtube_download_job(job_id: str, command: list[str]) -> None:
try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
except OSError as exc:
_set_youtube_job_state(job_id, status="error", message=f"Impossible d'exécuter yt-dlp: {exc}")
return
last_line = ""
max_percent = 0.0
in_postprocess = False
output = process.stdout
if output is not None:
for raw_line in output:
line = raw_line.strip()
if line == "":
continue
last_line = line
lower_line = line.lower()
if any(hint in lower_line for hint in YOUTUBE_POSTPROCESS_HINTS):
in_postprocess = True
shown_percent = max(max_percent, 99.0)
_set_youtube_job_state(
job_id,
status="running",
percent=shown_percent,
message="Conversion audio en cours...",
)
progress = _extract_progress_percent(line)
if progress is None:
continue
if progress > max_percent:
max_percent = progress
shown_percent = min(max_percent, 99.0)
if shown_percent >= 99.0:
in_postprocess = True
message = (
"Conversion audio en cours..."
if in_postprocess
else f"Téléchargement en cours: {shown_percent:.1f}%"
)
_set_youtube_job_state(
job_id,
status="running",
percent=shown_percent,
message=message,
)
return_code = process.wait()
if return_code == 0:
_set_youtube_job_state(
job_id,
status="completed",
percent=100.0,
message="Audio YouTube extrait et ajouté au stockage.",
)
return
detail = last_line if last_line else "Erreur inconnue"
_set_youtube_job_state(job_id, status="error", message=f"Échec extraction YouTube: {detail}")
def _trigger_sort_key(item: tuple[str, dict]) -> tuple[int, int | str]:
trigger_id = item[0]
if trigger_id.isdigit():
return (0, int(trigger_id))
return (1, trigger_id.lower())
@ui.get("/")
def home() -> Response:
if _backend_client() is None:
return redirect(url_for("ui.login"))
return redirect(url_for("ui.dashboard"))
@ui.route("/login", methods=["GET", "POST"])
def login() -> str | Response:
project_root: Path = current_app.config["PROJECT_ROOT"]
default_url = read_default_backend_url(project_root)
if request.method == "POST":
backend_url = default_url
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
session["backend_url"] = backend_url
session["backend_username"] = username
session["backend_password"] = password
client = _backend_client()
assert client is not None
try:
client.health()
client.list_triggers()
except BackendApiError:
return render_template(
"login.html",
login_error="Impossible de se connecter au service pour le moment. Vérifiez que le backend est demarré puis réessayez.",
attempted_username=username,
)
flash("Connexion au backend reussie.", "success")
return redirect(url_for("ui.dashboard"))
return render_template("login.html", login_error=None, attempted_username="")
@ui.get("/logout")
def logout() -> Response:
session.clear()
flash("Session fermee.", "info")
return redirect(url_for("ui.login"))
@ui.get("/dashboard")
def dashboard() -> str | Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
client = client_or_redirect
triggers: dict[str, dict] = {}
used_audio_files: set[str] = set()
try:
raw = client.list_triggers()
valid_triggers = {k: v for k, v in raw.items() if isinstance(v, dict)}
triggers = dict(sorted(valid_triggers.items(), key=_trigger_sort_key))
used_audio_files = _used_audio_files_from_triggers(valid_triggers)
except BackendApiError as exc:
flash(f"Impossible de charger les triggers: {exc}", "error")
audio_files = _list_audio_files()
return render_template(
"dashboard.html",
triggers=triggers,
audio_files=audio_files,
used_audio_files=used_audio_files,
backend_url=session.get("backend_url", ""),
username=session.get("backend_username", ""),
)
@ui.get("/audio-storage")
def audio_storage() -> str | Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
client = client_or_redirect
audio_files = _list_audio_files()
used_audio_files: set[str] = set()
try:
raw = client.list_triggers()
valid_triggers = {k: v for k, v in raw.items() if isinstance(v, dict)}
used_audio_files = _used_audio_files_from_triggers(valid_triggers)
except BackendApiError as exc:
flash(f"Impossible de charger les triggers: {exc}", "error")
return render_template("audio_storage.html", audio_files=audio_files, used_audio_files=used_audio_files)
@ui.post("/trigger/save")
def save_trigger() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": False, "error": "Non connecte"}), 401
return client_or_redirect
client = client_or_redirect
original_id = request.form.get("original_id", "").strip()
trigger_type = request.form.get("type", "").strip()
name = request.form.get("name", "").strip()
music_file = request.form.get("music_file", "").strip()
start_raw = request.form.get("start_seconds", "0")
end_raw = request.form.get("end_seconds", "")
volume_raw = request.form.get("volume", "80")
fade_in_raw = request.form.get("fade_in_seconds", "0")
fade_out_raw = request.form.get("fade_out_seconds", "0")
repeat_count_raw = request.form.get("repeat_count", "0")
normalize_audio = request.form.get("normalize_audio") in {"on", "true", "1"}
def _err(msg: str) -> Response:
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": False, "error": msg}), 400
flash(msg, "error")
return redirect(url_for("ui.dashboard"))
if not GPIO_PATTERN.match(trigger_type):
return _err("Le type doit respecter le format GPIO<number>.")
if not name or not music_file:
return _err("Le nom et le fichier audio sont obligatoires.")
try:
start_seconds = float(start_raw)
end_seconds = _parse_optional_float(end_raw)
volume = int(volume_raw)
fade_in_seconds = float(fade_in_raw)
fade_out_seconds = float(fade_out_raw)
repeat_count = int(repeat_count_raw)
except ValueError:
return _err("Les temps de debut/fin, fondus, repetitions et le volume doivent être numériques.")
if start_seconds < 0 or (end_seconds is not None and end_seconds <= start_seconds):
return _err("Fenêtre temporelle invalide.")
if not (0 <= volume <= 100):
return _err("Le volume doit être compris entre 0 et 100.")
if fade_in_seconds < 0 or fade_out_seconds < 0:
return _err("Les durées de fondu doivent être supérieures ou égales à 0.")
if repeat_count < 0:
return _err("Le nombre de répétitions doit être supérieur ou égal à 0.")
payload = {
"name": name,
"type": trigger_type,
"music_file": music_file,
"start_seconds": start_seconds,
"end_seconds": end_seconds,
"volume": volume,
"fade_in_seconds": fade_in_seconds,
"fade_out_seconds": fade_out_seconds,
"repeat_count": repeat_count,
"normalize_audio": normalize_audio,
}
try:
existing_raw = client.list_triggers()
existing_triggers = {k: v for k, v in existing_raw.items() if isinstance(v, dict)}
if original_id and original_id in existing_triggers:
trigger_id = original_id
else:
trigger_id = _next_numeric_trigger_id(existing_triggers)
for existing_id, existing_trigger in existing_triggers.items():
if existing_id == trigger_id:
continue
if existing_trigger.get("type") == trigger_type:
return _err("Ce type de trigger est déjà utilisé.")
if _normalize_key(str(existing_trigger.get("name", ""))) == _normalize_key(name):
return _err("Ce nom de trigger est déjà utilisé.")
client.upsert_trigger(trigger_id, payload)
except BackendApiError as exc:
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": False, "error": str(exc)}), 502
flash(f"Echec d'enregistrement du trigger: {exc}", "error")
return redirect(url_for("ui.dashboard"))
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({
"ok": True,
"message": f"Trigger \"{name}\" enregistré.",
"trigger_id": trigger_id,
"original_id": original_id or trigger_id,
"trigger": payload,
})
flash(f"Trigger \"{name}\" enregistré.", "success")
return redirect(url_for("ui.dashboard"))
@ui.post("/trigger/delete")
def delete_trigger() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": False, "error": "Non connecte"}), 401
return client_or_redirect
client = client_or_redirect
trigger_id = request.form.get("trigger_id", "").strip()
if not trigger_id:
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": False, "error": "Identifiant du trigger manquant."}), 400
flash("Identifiant du trigger manquant.", "error")
return redirect(url_for("ui.dashboard"))
try:
client.delete_trigger(trigger_id)
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": True, "message": "Trigger supprimé.", "trigger_id": trigger_id})
flash("Trigger supprimé.", "success")
except BackendApiError as exc:
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": False, "error": f"Echec de suppression: {exc}"}), 500
flash(f"Echec de suppression: {exc}", "error")
return redirect(url_for("ui.dashboard"))
@ui.post("/trigger/play")
def play_trigger() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": False, "error": "Non connecte"}), 401
return client_or_redirect
client = client_or_redirect
trigger_id = request.form.get("trigger_id", "").strip()
if not trigger_id:
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": False, "error": "Identifiant du trigger manquant."}), 400
flash("Identifiant du trigger manquant.", "error")
return redirect(url_for("ui.dashboard"))
try:
client.play_trigger(trigger_id)
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": True, "message": "Trigger démarré."})
flash("Trigger démarré.", "success")
except BackendApiError as exc:
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": False, "error": str(exc)}), 502
flash(f"Echec du lancement: {exc}", "error")
return redirect(url_for("ui.dashboard"))
@ui.post("/audio/stop")
def stop_audio() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": False, "error": "Non connecte"}), 401
return client_or_redirect
client = client_or_redirect
try:
client.stop_audio()
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": True, "message": "Audio arrete."})
flash("Audio arrete.", "info")
except BackendApiError as exc:
if request.headers.get("X-Requested-With") == "fetch":
return jsonify({"ok": False, "error": str(exc)}), 502
flash(f"Echec de l'arrêt audio: {exc}", "error")
return redirect(url_for("ui.dashboard"))
@ui.post("/audio/upload")
def upload_audio() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
youtube_url = request.form.get("youtube_url", "").strip()
if youtube_url:
requested_name = request.form.get("youtube_filename", "").strip()
command, command_error = _build_youtube_download_command(youtube_url, requested_name, _music_dir())
if command is None:
flash(command_error or "Impossible de préparer la commande yt-dlp.", "error")
return redirect(_audio_redirect_target())
try:
result = subprocess.run(command, check=False, capture_output=True, text=True)
except OSError as exc:
flash(f"Impossible d'exécuter yt-dlp: {exc}", "error")
return redirect(_audio_redirect_target())
if result.returncode != 0:
detail = (result.stderr or result.stdout or "").strip().splitlines()
message = detail[-1] if detail else "Erreur inconnue"
flash(f"Échec extraction YouTube: {message}", "error")
return redirect(_audio_redirect_target())
flash("Audio YouTube extrait et ajouté au stockage.", "success")
return redirect(_audio_redirect_target())
audio = request.files.get("audio_file")
if audio is None or audio.filename is None or audio.filename.strip() == "":
flash("Sélectionnez d'abord un fichier.", "error")
return redirect(_audio_redirect_target())
filename = secure_filename(audio.filename)
if filename == "":
flash("Nom de fichier invalide.", "error")
return redirect(_audio_redirect_target())
ext = Path(filename).suffix.lower()
if ext not in ALLOWED_AUDIO_EXTENSIONS:
flash("Format audio non supporté.", "error")
return redirect(_audio_redirect_target())
existing_names = {item.name.lower() for item in _music_dir().iterdir() if item.is_file()}
if filename.lower() in existing_names:
flash("Un fichier avec ce nom existe déjà.", "error")
return redirect(_audio_redirect_target())
destination = _music_dir() / filename
audio.save(destination)
flash(f"Fichier {filename} téléversé.", "success")
return redirect(_audio_redirect_target())
@ui.post("/audio/delete")
def delete_audio() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
filename = request.form.get("filename", "").strip()
if not filename:
flash("Nom de fichier manquant.", "error")
return redirect(_audio_redirect_target())
target = _music_dir() / filename
if not target.exists() or not target.is_file():
flash("Fichier introuvable.", "error")
return redirect(_audio_redirect_target())
client = client_or_redirect
try:
raw = client.list_triggers()
valid_triggers = {k: v for k, v in raw.items() if isinstance(v, dict)}
used_audio_files = _used_audio_files_from_triggers(valid_triggers)
if Path(filename).name.lower() in used_audio_files:
flash("Ce fichier est associé à un trigger et ne peut pas être supprimé.", "error")
return redirect(_audio_redirect_target())
except BackendApiError as exc:
flash(f"Impossible de vérifier les triggers: {exc}", "error")
return redirect(_audio_redirect_target())
target.unlink()
flash(f"Fichier {filename} supprimé.", "success")
return redirect(_audio_redirect_target())
@ui.post("/audio/youtube/proposed-name")
def youtube_proposed_name() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return jsonify({"ok": False, "error": "Non connecte"}), 401
youtube_url = request.form.get("youtube_url", "").strip()
if youtube_url == "":
return jsonify({"ok": False, "error": "Lien YouTube manquant."}), 400
yt_dlp_path = shutil.which("yt-dlp")
if yt_dlp_path is None:
return jsonify({"ok": False, "error": "yt-dlp est introuvable sur le système."}), 400
if shutil.which("node") is None:
return jsonify({"ok": False, "error": "node est requis pour l'option --js-runtimes node."}), 400
command = [
yt_dlp_path,
"--js-runtimes",
"node",
"--remote-components",
"ejs:github",
"--no-playlist",
"--skip-download",
"--print",
"%(title)s",
youtube_url,
]
try:
result = subprocess.run(command, check=False, capture_output=True, text=True)
except OSError as exc:
return jsonify({"ok": False, "error": f"Impossible d'exécuter yt-dlp: {exc}"}), 500
if result.returncode != 0:
detail = (result.stderr or result.stdout or "").strip().splitlines()
message = detail[-1] if detail else "Erreur inconnue"
return jsonify({"ok": False, "error": f"Échec lecture metadonnées YouTube: {message}"}), 502
title = next((line.strip() for line in result.stdout.splitlines() if line.strip()), "")
proposed = _normalize_youtube_filename(title) or "audio_youtube.mp3"
existing_names = {item.name.lower() for item in _music_dir().iterdir() if item.is_file()}
available_name = _next_available_filename(proposed, existing_names)
return jsonify({"ok": True, "filename": available_name})
@ui.post("/audio/youtube/start")
def youtube_download_start() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return jsonify({"ok": False, "error": "Non connecte"}), 401
youtube_url = request.form.get("youtube_url", "").strip()
if youtube_url == "":
return jsonify({"ok": False, "error": "Lien YouTube manquant."}), 400
requested_name = request.form.get("youtube_filename", "").strip()
music_dir = _music_dir()
command, command_error = _build_youtube_download_command(youtube_url, requested_name, music_dir)
if command is None:
return jsonify({"ok": False, "error": command_error or "Impossible de préparer la commande yt-dlp."}), 400
job_id = _create_youtube_job()
worker = threading.Thread(target=_run_youtube_download_job, args=(job_id, command), daemon=True)
worker.start()
return jsonify({"ok": True, "job_id": job_id})
@ui.get("/audio/youtube/status/<job_id>")
def youtube_download_status(job_id: str) -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return jsonify({"ok": False, "error": "Non connecte"}), 401
with _YOUTUBE_JOBS_LOCK:
job = _YOUTUBE_JOBS.get(job_id)
if job is None:
job_path = YOUTUBE_JOBS_DIR / f"{job_id}.json"
if job_path.exists():
try:
job = json.loads(job_path.read_text(encoding="utf-8"))
except Exception:
job = None
if job is None:
return jsonify({"ok": False, "error": "Téléchargement introuvable."}), 404
payload = {
"ok": True,
"status": str(job.get("status", "running")),
"percent": float(job.get("percent", 0.0)),
"message": str(job.get("message", "Téléchargement en cours...")),
}
return jsonify(payload)
@ui.get("/audio/download/<path:filename>")
def download_audio(filename: str) -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
safe_name = Path(filename).name
target = _music_dir() / safe_name
if not target.exists() or not target.is_file():
flash("Fichier introuvable.", "error")
return redirect(url_for("ui.dashboard"))
return send_file(target, as_attachment=True)
@ui.get("/audio/stream/<path:filename>")
def stream_audio(filename: str) -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
safe_name = Path(filename).name
target = _music_dir() / safe_name
if not target.exists() or not target.is_file():
flash("Fichier introuvable.", "error")
return redirect(url_for("ui.dashboard"))
return send_file(target, as_attachment=False)