You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

175 lines
6.6 KiB

from __future__ import annotations
import logging
import shutil
import subprocess
import threading
from pathlib import Path
from typing import Optional
logger = logging.getLogger("pysonnerie.audio")
class AudioPlayer:
def __init__(self, music_dir: Path):
self.music_dir = music_dir
self._lock = threading.RLock()
self._proc: Optional[subprocess.Popen] = None
self._decoder_proc: Optional[subprocess.Popen] = None
def _resolve_music_path(self, music_file: str) -> Path:
candidate = (self.music_dir / music_file).resolve()
if self.music_dir.resolve() not in candidate.parents:
raise ValueError("music_file path traversal is forbidden")
if not candidate.exists() or not candidate.is_file():
raise FileNotFoundError(f"Music file not found: {music_file}")
return candidate
def _read_duration_seconds(self, music_path: Path) -> Optional[float]:
try:
result = subprocess.run(
["soxi", "-D", str(music_path)],
capture_output=True,
check=True,
text=True,
)
return float(result.stdout.strip())
except (subprocess.CalledProcessError, FileNotFoundError, ValueError):
logger.warning("Unable to read duration with soxi for %s", music_path)
# Fallback for setups where sox lacks some codecs (e.g. mp3).
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(music_path),
],
capture_output=True,
check=True,
text=True,
)
return float(result.stdout.strip())
except (subprocess.CalledProcessError, FileNotFoundError, ValueError):
logger.warning("Unable to read duration with ffprobe for %s", music_path)
return None
def play(
self,
music_file: str,
start_seconds: float = 0.0,
end_seconds: Optional[float] = None,
volume: int = 80,
fade_in_seconds: float = 0.0,
fade_out_seconds: float = 0.0,
repeat_count: int = 0,
normalize_audio: bool = False,
) -> None:
with self._lock:
music_path = self._resolve_music_path(music_file)
self.stop()
if start_seconds < 0:
raise ValueError("start_seconds must be greater than or equal to 0")
if repeat_count < 0:
raise ValueError("repeat_count must be greater than or equal to 0")
duration: Optional[float] = None
if end_seconds is not None:
duration = float(end_seconds) - float(start_seconds)
if duration <= 0:
raise ValueError("end_seconds must be greater than start_seconds")
volume_factor = max(0.0, min(1.0, float(volume) / 100.0))
base_cmd = [
"play",
"-q",
"-v",
str(volume_factor),
]
effects: list[str] = []
if normalize_audio:
effects += ["gain", "-n"]
effects += ["trim", str(float(start_seconds))]
if duration is not None:
effects.append(str(duration))
fade_in = max(0.0, float(fade_in_seconds))
fade_out = max(0.0, float(fade_out_seconds))
if fade_in > 0 or fade_out > 0:
play_duration = duration
if play_duration is None:
total_duration = self._read_duration_seconds(music_path)
if total_duration is not None:
play_duration = total_duration - float(start_seconds)
if play_duration is not None:
if play_duration <= 0:
raise ValueError("Playback window is empty")
fade_in = min(fade_in, play_duration)
fade_out = min(fade_out, play_duration)
if fade_out > 0:
effects += ["fade", "t", str(fade_in), str(play_duration), str(fade_out)]
else:
effects += ["fade", "t", str(fade_in)]
else:
if fade_out > 0:
logger.warning(
"fade_out_seconds ignored for %s because duration is unknown",
music_path,
)
effects += ["fade", "t", str(fade_in)]
if repeat_count > 0:
effects += ["repeat", str(int(repeat_count))]
# If sox lacks mp3 handlers, decode with ffmpeg and stream wav to sox.
if music_path.suffix.lower() == ".mp3" and shutil.which("ffmpeg"):
decoder_cmd = ["ffmpeg", "-v", "error", "-i", str(music_path), "-f", "wav", "-"]
self._decoder_proc = subprocess.Popen(
decoder_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
cmd = base_cmd + ["-t", "wav", "-"] + effects
self._proc = subprocess.Popen(
cmd,
stdin=self._decoder_proc.stdout,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if self._decoder_proc.stdout is not None:
self._decoder_proc.stdout.close()
else:
if music_path.suffix.lower() == ".mp3":
logger.warning("ffmpeg unavailable, falling back to direct sox playback for %s", music_path)
cmd = base_cmd + [str(music_path)] + effects
self._proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def stop(self) -> None:
with self._lock:
if self._decoder_proc and self._decoder_proc.poll() is None:
self._decoder_proc.terminate()
try:
self._decoder_proc.wait(timeout=2)
except subprocess.TimeoutExpired:
self._decoder_proc.kill()
self._decoder_proc = None
if self._proc and self._proc.poll() is None:
self._proc.terminate()
try:
self._proc.wait(timeout=2)
except subprocess.TimeoutExpired:
self._proc.kill()
self._proc = None