Browse Source

Mise à jour dépendances python

master
scayac 1 month ago
parent
commit
0b66adab5e
  1. 4
      .gitignore
  2. BIN
      backend/app/__pycache__/serial_listener.cpython-312.pyc
  3. 105
      backend/app/serial_listener.py
  4. 5
      backend/requirements.txt

4
.gitignore vendored

@ -1 +1,3 @@ @@ -1 +1,3 @@
.venv
.venv
__pycache__
*.pyc

BIN
backend/app/__pycache__/serial_listener.cpython-312.pyc

Binary file not shown.

105
backend/app/serial_listener.py

@ -1,10 +1,62 @@ @@ -1,10 +1,62 @@
from __future__ import annotations
import logging
import os
import select
import threading
import termios
from typing import Callable, Dict, Optional, TypedDict
import serial
class SerialConnectionError(Exception):
pass
_BAUDRATE_TO_TERM = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
230400: termios.B230400,
}
def _configure_port(fd: int, baudrate: int) -> None:
speed = _BAUDRATE_TO_TERM.get(baudrate)
if speed is None:
raise SerialConnectionError(f"Unsupported baudrate: {baudrate}")
attrs = termios.tcgetattr(fd)
attrs[0] = 0
attrs[1] = 0
attrs[2] = termios.CS8 | termios.CREAD | termios.CLOCAL
attrs[3] = 0
attrs[4] = speed
attrs[5] = speed
attrs[6][termios.VMIN] = 0
attrs[6][termios.VTIME] = 0
termios.tcflush(fd, termios.TCIFLUSH)
termios.tcsetattr(fd, termios.TCSANOW, attrs)
def _open_serial_fd(port: str, baudrate: int) -> int:
try:
fd = os.open(port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as exc:
raise SerialConnectionError(f"Cannot open serial port {port}: {exc}") from exc
try:
_configure_port(fd, baudrate)
except Exception:
os.close(fd)
raise
return fd
class TriggerLogInfo(TypedDict, total=False):
@ -48,25 +100,40 @@ class SerialTriggerListener: @@ -48,25 +100,40 @@ class SerialTriggerListener:
self._logger.info("Starting serial listener on %s @ %s", port, baudrate)
while not self._stop_event.is_set():
try:
with serial.Serial(port=port, baudrate=baudrate, timeout=timeout) as serial_conn:
fd = _open_serial_fd(port=port, baudrate=baudrate)
line_buffer = bytearray()
try:
while not self._stop_event.is_set():
line = serial_conn.readline().decode("utf-8", errors="ignore").strip()
if not line:
readable, _, _ = select.select([fd], [], [], timeout)
if not readable:
continue
trigger_info = self.on_trigger(line)
if trigger_info and trigger_info.get("key") and trigger_info.get("name"):
self._logger.info(
"Serial trigger received: %s -> %s (%s)",
line,
trigger_info["key"],
trigger_info["name"],
)
elif trigger_info and trigger_info.get("key"):
self._logger.info("Serial trigger received: %s -> %s", line, trigger_info["key"])
elif trigger_info and trigger_info.get("name"):
self._logger.info("Serial trigger received: %s (%s)", line, trigger_info["name"])
else:
self._logger.info("Serial trigger received: %s", line)
except serial.SerialException as exc:
chunk = os.read(fd, 256)
if not chunk:
continue
line_buffer.extend(chunk)
while b"\n" in line_buffer:
raw_line, _, remainder = line_buffer.partition(b"\n")
line_buffer = bytearray(remainder)
line = raw_line.decode("utf-8", errors="ignore").strip()
if not line:
continue
trigger_info = self.on_trigger(line)
if trigger_info and trigger_info.get("key") and trigger_info.get("name"):
self._logger.info(
"Serial trigger received: %s -> %s (%s)",
line,
trigger_info["key"],
trigger_info["name"],
)
elif trigger_info and trigger_info.get("key"):
self._logger.info("Serial trigger received: %s -> %s", line, trigger_info["key"])
elif trigger_info and trigger_info.get("name"):
self._logger.info("Serial trigger received: %s (%s)", line, trigger_info["name"])
else:
self._logger.info("Serial trigger received: %s", line)
finally:
os.close(fd)
except (OSError, SerialConnectionError) as exc:
self._logger.warning("Serial connection error: %s", exc)
self._stop_event.wait(2)

5
backend/requirements.txt

@ -1,3 +1,2 @@ @@ -1,3 +1,2 @@
fastapi==0.116.1
uvicorn[standard]==0.35.0
pyserial==3.5
fastapi==0.135.2
uvicorn[standard]==0.42.0

Loading…
Cancel
Save