You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

326 lines
10 KiB

from __future__ import annotations
import re
from pathlib import Path
from flask import (
Blueprint,
Response,
current_app,
flash,
redirect,
render_template,
request,
send_file,
session,
url_for,
)
from werkzeug.utils import secure_filename
from .backend_client import BackendApiError, BackendClient, read_default_backend_url
ui = Blueprint("ui", __name__, template_folder="templates", static_folder="static")
GPIO_PATTERN = re.compile(r"^GPIO\d+$")
ALLOWED_AUDIO_EXTENSIONS = {".mp3", ".wav", ".ogg", ".flac", ".aac", ".m4a"}
def _backend_client() -> BackendClient | None:
backend_url = session.get("backend_url")
username = session.get("backend_username")
password = session.get("backend_password")
if not backend_url or not username or not password:
return None
return BackendClient(str(backend_url), str(username), str(password))
def _ensure_login() -> BackendClient | Response:
client = _backend_client()
if client is None:
flash("Veuillez vous connecter d'abord.", "error")
return redirect(url_for("ui.login"))
return client
def _music_dir() -> Path:
music_dir = current_app.config["MUSIC_DIR"]
music_dir.mkdir(parents=True, exist_ok=True)
return music_dir
def _list_audio_files() -> list[str]:
files: list[str] = []
for item in _music_dir().iterdir():
if item.is_file() and item.suffix.lower() in ALLOWED_AUDIO_EXTENSIONS:
files.append(item.name)
return sorted(files, key=str.lower)
def _parse_optional_float(value: str) -> float | None:
clean = value.strip()
if clean == "":
return None
return float(clean)
@ui.get("/")
def home() -> Response:
if _backend_client() is None:
return redirect(url_for("ui.login"))
return redirect(url_for("ui.dashboard"))
@ui.route("/login", methods=["GET", "POST"])
def login() -> str | Response:
project_root: Path = current_app.config["PROJECT_ROOT"]
default_url = read_default_backend_url(project_root)
if request.method == "POST":
backend_url = default_url
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
session["backend_url"] = backend_url
session["backend_username"] = username
session["backend_password"] = password
client = _backend_client()
assert client is not None
try:
client.health()
client.list_triggers()
except BackendApiError:
return render_template(
"login.html",
login_error="Impossible de se connecter au service pour le moment. Vérifiez que le backend est demarré puis réessayez.",
attempted_username=username,
)
flash("Connexion au backend reussie.", "success")
return redirect(url_for("ui.dashboard"))
return render_template("login.html", login_error=None, attempted_username="")
@ui.get("/logout")
def logout() -> Response:
session.clear()
flash("Session fermee.", "info")
return redirect(url_for("ui.login"))
@ui.get("/dashboard")
def dashboard() -> str | Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
client = client_or_redirect
triggers: dict[str, dict] = {}
try:
raw = client.list_triggers()
triggers = {k: v for k, v in raw.items() if isinstance(v, dict)}
except BackendApiError as exc:
flash(f"Impossible de charger les triggers: {exc}", "error")
audio_files = _list_audio_files()
return render_template(
"dashboard.html",
triggers=triggers,
audio_files=audio_files,
backend_url=session.get("backend_url", ""),
username=session.get("backend_username", ""),
)
@ui.post("/trigger/save")
def save_trigger() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
client = client_or_redirect
original_id = request.form.get("original_id", "").strip()
trigger_type = request.form.get("type", "").strip()
name = request.form.get("name", "").strip()
music_file = request.form.get("music_file", "").strip()
start_raw = request.form.get("start_seconds", "0")
end_raw = request.form.get("end_seconds", "")
if not GPIO_PATTERN.match(trigger_type):
flash("Le type doit respecter le format GPIO<number>.", "error")
return redirect(url_for("ui.dashboard"))
if not name or not music_file:
flash("Le nom et le fichier audio sont obligatoires.", "error")
return redirect(url_for("ui.dashboard"))
try:
start_seconds = float(start_raw)
end_seconds = _parse_optional_float(end_raw)
except ValueError:
flash("Les temps de debut/fin doivent être numériques.", "error")
return redirect(url_for("ui.dashboard"))
if start_seconds < 0 or (end_seconds is not None and end_seconds <= start_seconds):
flash("Fenêtre temporelle invalide.", "error")
return redirect(url_for("ui.dashboard"))
payload = {
"name": name,
"type": trigger_type,
"music_file": music_file,
"start_seconds": start_seconds,
"end_seconds": end_seconds,
}
try:
if original_id and original_id != trigger_type:
client.upsert_trigger(trigger_type, payload)
client.delete_trigger(original_id)
else:
client.upsert_trigger(trigger_type, payload)
except BackendApiError as exc:
flash(f"Echec d'enregistrement du trigger: {exc}", "error")
return redirect(url_for("ui.dashboard"))
flash(f"Trigger {trigger_type} enregistré.", "success")
return redirect(url_for("ui.dashboard"))
@ui.post("/trigger/delete")
def delete_trigger() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
client = client_or_redirect
trigger_id = request.form.get("trigger_id", "").strip()
if not trigger_id:
flash("Identifiant du trigger manquant.", "error")
return redirect(url_for("ui.dashboard"))
try:
client.delete_trigger(trigger_id)
flash(f"Trigger {trigger_id} supprime.", "success")
except BackendApiError as exc:
flash(f"Echec de suppression: {exc}", "error")
return redirect(url_for("ui.dashboard"))
@ui.post("/trigger/play")
def play_trigger() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
client = client_or_redirect
trigger_id = request.form.get("trigger_id", "").strip()
if not trigger_id:
flash("Identifiant du trigger manquant.", "error")
return redirect(url_for("ui.dashboard"))
try:
client.play_trigger(trigger_id)
flash(f"Trigger {trigger_id} demarré.", "success")
except BackendApiError as exc:
flash(f"Echec du lancement: {exc}", "error")
return redirect(url_for("ui.dashboard"))
@ui.post("/audio/stop")
def stop_audio() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
client = client_or_redirect
try:
client.stop_audio()
flash("Audio arrete.", "info")
except BackendApiError as exc:
flash(f"Echec de l'arrêt audio: {exc}", "error")
return redirect(url_for("ui.dashboard"))
@ui.post("/audio/upload")
def upload_audio() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
audio = request.files.get("audio_file")
if audio is None or audio.filename is None or audio.filename.strip() == "":
flash("Sélectionnez d'abord un fichier.", "error")
return redirect(url_for("ui.dashboard"))
filename = secure_filename(audio.filename)
if filename == "":
flash("Nom de fichier invalide.", "error")
return redirect(url_for("ui.dashboard"))
ext = Path(filename).suffix.lower()
if ext not in ALLOWED_AUDIO_EXTENSIONS:
flash("Format audio non supporté.", "error")
return redirect(url_for("ui.dashboard"))
existing_names = {item.name.lower() for item in _music_dir().iterdir() if item.is_file()}
if filename.lower() in existing_names:
flash("Un fichier avec ce nom existe déjà.", "error")
return redirect(url_for("ui.dashboard"))
destination = _music_dir() / filename
audio.save(destination)
flash(f"Fichier {filename} téléversé.", "success")
return redirect(url_for("ui.dashboard"))
@ui.post("/audio/delete")
def delete_audio() -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
filename = request.form.get("filename", "").strip()
if not filename:
flash("Nom de fichier manquant.", "error")
return redirect(url_for("ui.dashboard"))
target = _music_dir() / filename
if not target.exists() or not target.is_file():
flash("Fichier introuvable.", "error")
return redirect(url_for("ui.dashboard"))
target.unlink()
flash(f"Fichier {filename} supprimé.", "success")
return redirect(url_for("ui.dashboard"))
@ui.get("/audio/download/<path:filename>")
def download_audio(filename: str) -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
safe_name = Path(filename).name
target = _music_dir() / safe_name
if not target.exists() or not target.is_file():
flash("Fichier introuvable.", "error")
return redirect(url_for("ui.dashboard"))
return send_file(target, as_attachment=True)
@ui.get("/audio/stream/<path:filename>")
def stream_audio(filename: str) -> Response:
client_or_redirect = _ensure_login()
if isinstance(client_or_redirect, Response):
return client_or_redirect
safe_name = Path(filename).name
target = _music_dir() / safe_name
if not target.exists() or not target.is_file():
flash("Fichier introuvable.", "error")
return redirect(url_for("ui.dashboard"))
return send_file(target, as_attachment=False)