from __future__ import annotations import logging import subprocess import threading from pathlib import Path from typing import Optional logger = logging.getLogger("pysonnerie.audio") class AudioPlayer: def __init__(self, music_dir: Path): self.music_dir = music_dir self._lock = threading.RLock() self._proc: Optional[subprocess.Popen] = None self._decoder_proc: Optional[subprocess.Popen] = None def _resolve_music_path(self, music_file: str) -> Path: candidate = (self.music_dir / music_file).resolve() if self.music_dir.resolve() not in candidate.parents: raise ValueError("music_file path traversal is forbidden") if not candidate.exists() or not candidate.is_file(): raise FileNotFoundError(f"Music file not found: {music_file}") return candidate def _read_duration_seconds(self, music_path: Path) -> Optional[float]: try: result = subprocess.run( ["soxi", "-D", str(music_path)], capture_output=True, check=True, text=True, ) return float(result.stdout.strip()) except (subprocess.CalledProcessError, FileNotFoundError, ValueError): logger.warning("Unable to read duration with soxi for %s", music_path) # Fallback for setups where sox lacks some codecs (e.g. mp3). try: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(music_path), ], capture_output=True, check=True, text=True, ) return float(result.stdout.strip()) except (subprocess.CalledProcessError, FileNotFoundError, ValueError): logger.warning("Unable to read duration with ffprobe for %s", music_path) return None def play( self, music_file: str, start_seconds: float = 0.0, end_seconds: Optional[float] = None, volume: int = 80, fade_in_seconds: float = 0.0, fade_out_seconds: float = 0.0, repeat_count: int = 0, normalize_audio: bool = False, ) -> None: with self._lock: music_path = self._resolve_music_path(music_file) self.stop() if start_seconds < 0: raise ValueError("start_seconds must be greater than or equal to 0") if repeat_count < 0: raise ValueError("repeat_count must be greater than or equal to 0") duration: Optional[float] = None if end_seconds is not None: duration = float(end_seconds) - float(start_seconds) if duration <= 0: raise ValueError("end_seconds must be greater than start_seconds") volume_factor = max(0.0, min(1.0, float(volume) / 100.0)) cmd = [ "play", "-q", "-v", str(volume_factor), str(music_path), ] effects: list[str] = [] if normalize_audio: effects += ["gain", "-n"] effects += ["trim", str(float(start_seconds))] if duration is not None: effects.append(str(duration)) fade_in = max(0.0, float(fade_in_seconds)) fade_out = max(0.0, float(fade_out_seconds)) if fade_in > 0 or fade_out > 0: play_duration = duration if play_duration is None: total_duration = self._read_duration_seconds(music_path) if total_duration is not None: play_duration = total_duration - float(start_seconds) if play_duration is not None: if play_duration <= 0: raise ValueError("Playback window is empty") fade_in = min(fade_in, play_duration) fade_out = min(fade_out, play_duration) if fade_out > 0: effects += ["fade", "t", str(fade_in), str(play_duration), str(fade_out)] else: effects += ["fade", "t", str(fade_in)] else: if fade_out > 0: logger.warning( "fade_out_seconds ignored for %s because duration is unknown", music_path, ) effects += ["fade", "t", str(fade_in)] if repeat_count > 0: effects += ["repeat", str(int(repeat_count))] # If sox lacks mp3 handlers, decode with ffmpeg and stream wav to sox. if music_path.suffix.lower() == ".mp3": decoder_cmd = ["ffmpeg", "-v", "error", "-i", str(music_path), "-f", "wav", "-"] self._decoder_proc = subprocess.Popen( decoder_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) cmd += ["-t", "wav", "-"] + effects self._proc = subprocess.Popen( cmd, stdin=self._decoder_proc.stdout, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) if self._decoder_proc.stdout is not None: self._decoder_proc.stdout.close() else: cmd += [str(music_path)] + effects self._proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def stop(self) -> None: with self._lock: if self._decoder_proc and self._decoder_proc.poll() is None: self._decoder_proc.terminate() try: self._decoder_proc.wait(timeout=2) except subprocess.TimeoutExpired: self._decoder_proc.kill() self._decoder_proc = None if self._proc and self._proc.poll() is None: self._proc.terminate() try: self._proc.wait(timeout=2) except subprocess.TimeoutExpired: self._proc.kill() self._proc = None