from __future__ import annotations import json from pathlib import Path from typing import Any import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class BackendApiError(RuntimeError): pass def read_default_backend_url(project_root: Path) -> str: conf_path = project_root / "backend" / "data" / "conf.json" default_url = "https://127.0.0.1:8443" if not conf_path.exists(): return default_url try: conf = json.loads(conf_path.read_text(encoding="utf-8")) except Exception: return default_url server = conf.get("server", {}) if isinstance(conf, dict) else {} host = str(server.get("host", "127.0.0.1")) port = int(server.get("port", 8443)) if host in {"0.0.0.0", "::"}: host = "127.0.0.1" return f"https://{host}:{port}" class BackendClient: def __init__(self, base_url: str, username: str, password: str) -> None: self.base_url = base_url.rstrip("/") self.auth = (username, password) self.timeout = 8 def _request(self, method: str, path: str, json_payload: dict[str, Any] | None = None) -> Any: url = f"{self.base_url}{path}" try: response = requests.request( method=method, url=url, auth=self.auth, json=json_payload, timeout=self.timeout, verify=False, ) except requests.RequestException as exc: raise BackendApiError(f"Backend inaccessible: {exc}") from exc if response.status_code >= 400: detail = response.text try: payload = response.json() detail = payload.get("detail", detail) except Exception: pass raise BackendApiError(f"Erreur HTTP {response.status_code}: {detail}") if not response.content: return {} try: return response.json() except ValueError: return {} def health(self) -> dict[str, Any]: url = f"{self.base_url}/api/health" try: response = requests.get(url, timeout=self.timeout, verify=False) response.raise_for_status() return response.json() except requests.RequestException as exc: raise BackendApiError(f"Echec du controle de sante: {exc}") from exc def list_triggers(self) -> dict[str, Any]: data = self._request("GET", "/api/triggers") return data if isinstance(data, dict) else {} def upsert_trigger(self, trigger_id: str, payload: dict[str, Any]) -> dict[str, Any]: data = self._request("PUT", f"/api/triggers/{trigger_id}", payload) return data if isinstance(data, dict) else {} def patch_trigger(self, trigger_id: str, payload: dict[str, Any]) -> dict[str, Any]: data = self._request("PATCH", f"/api/triggers/{trigger_id}", payload) return data if isinstance(data, dict) else {} def delete_trigger(self, trigger_id: str) -> dict[str, Any]: data = self._request("DELETE", f"/api/triggers/{trigger_id}") return data if isinstance(data, dict) else {} def play_trigger(self, trigger_id: str, repeat_count: int | None = None) -> dict[str, Any]: path = f"/api/play/{trigger_id}" if repeat_count is not None: path += f"?repeat_count={repeat_count}" data = self._request("GET", path) return data if isinstance(data, dict) else {} def stop_audio(self) -> dict[str, Any]: data = self._request("GET", "/api/stop") return data if isinstance(data, dict) else {}