Browse Source

Ajout fonction update OTA

master
scayac 2 months ago
parent
commit
ce54aa4e76
  1. 44
      README.md
  2. 9
      platformio.ini
  3. 167
      src/main.cpp
  4. 583
      tools/espota.py

44
README.md

@ -77,6 +77,50 @@ Chaque module ESP32-S3 peut être identifié automatiquement via 3 GPIOs configu @@ -77,6 +77,50 @@ Chaque module ESP32-S3 peut être identifié automatiquement via 3 GPIOs configu
**Avantage** : Plus besoin de modifier le code pour chaque module, il suffit de configurer les jumpers !
### Mise à jour OTA (Over-The-Air)
Le module supporte la mise à jour distante via WiFi sans câble USB !
**Activation du mode OTA** :
1. **Appuyez simultanément sur les 4 boutons (A, B, C, D)** pendant **5 secondes**
2. 🔵 La **LED clignotera rapidement en bleu** → Mode OTA actif
3. Le module crée un **Point d'accès WiFi** avec :
- **SSID** : Nom du module (ex: `BleQuiz-5`)
- **Mot de passe** : `12345678`
- **IP** : `192.168.4.1`
**Effectuer la mise à jour** :
```bash
# Sur votre ordinateur, connectez-vous au WiFi du module, puis dans le répertoire tools, exécutez l'outil espota :
python3 espota.py -i 192.168.4.1 -p 3232 -f [CHEMIN_DU_FIRMWARE]/firmware.bin
```
### Mise à jour OTA via PlatformIO
Vous pouvez également effectuer la mise à jour OTA directement depuis PlatformIO :
1. Connectez le module à son réseau WiFi OTA (voir instructions ci-dessus).
2. Modifiez le fichier `platformio.ini` en décommantant la configuration OTA :
```ini
upload_protocol = espota
upload_port = 192.168.4.1
```
3. Lancez la commande suivante pour téléverser le firmware :
```bash
pio run -e esp32-s3-supermini --target upload
```
PlatformIO utilisera automatiquement le protocole OTA pour envoyer le firmware au module via le réseau WiFi.
**Remarque** : Assurez-vous que votre ordinateur est connecté au même réseau que le module ESP32-S3.
**Timeout de sécurité** :
- Si aucune mise à jour n'est reçue après **60 secondes**, le module redémarre automatiquement
- Cela évite que le module reste bloqué en mode OTA
**Indicateurs LED en mode OTA** :
- 🔵 **Bleu clignotant rapide** → En attente de mise à jour
- 🟢 **Vert fixe** → Mise à jour réussie (avant redémarrage)
- 🔴 **Rouge fixe** → Erreur lors de la mise à jour
### Appairage Bluetooth sécurisé
Le système utilise un appairage sécurisé avec code PIN pour une meilleure compatibilité :

9
platformio.ini

@ -2,10 +2,13 @@ @@ -2,10 +2,13 @@
platform = espressif32
board = esp32-s3-devkitc-1
framework = arduino
board_upload.flash_size = 4MB
board_build.partitions = default.csv
board_upload.flash_size = 8MB
board_build.partitions = default_8MB.csv
build_flags =
-DARDUINO_USB_CDC_ON_BOOT=1
-DBOARD_HAS_PSRAM
lib_deps =
adafruit/Adafruit NeoPixel@^1.12.0
adafruit/Adafruit NeoPixel@^1.12.0
ArduinoOTA
;upload_protocol = espota
;upload_port = 192.168.4.1

167
src/main.cpp

@ -6,6 +6,8 @@ @@ -6,6 +6,8 @@
#include <BLESecurity.h>
#include <esp_gap_ble_api.h>
#include <Adafruit_NeoPixel.h>
#include <WiFi.h>
#include <ArduinoOTA.h>
// Configuration LED WS2812
#define LED_PIN 48
@ -44,6 +46,10 @@ uint8_t gpioStates = 0; // Stocke l'état des GPIOs (bit à bit) @@ -44,6 +46,10 @@ uint8_t gpioStates = 0; // Stocke l'état des GPIOs (bit à bit)
unsigned long lastBlinkTime = 0;
bool blinkState = false;
bool gpioEnabled = false; // false = GPIO désactivés (résultats ou scores), true = GPIO actifs
unsigned long allButtonsStartTime = 0; // Temps du début de l'appui simultané
bool allButtonsPressed = false; // Détecte si tous les boutons sont appuyés
bool otaMode = false; // Mode mise à jour OTA activé
unsigned long otaModeStartTime = 0; // Temps du début du mode OTA
// Nom du module (sera déterminé automatiquement)
String moduleName = "BleQuiz-1";
@ -77,6 +83,62 @@ class CommandCallbacks: public BLECharacteristicCallbacks { @@ -77,6 +83,62 @@ class CommandCallbacks: public BLECharacteristicCallbacks {
}
};
// Fonction pour initialiser le mode OTA (mise à jour distante)
void setupOTA() {
// Arrêter BLE temporairement
BLEDevice::stopAdvertising();
// Démarrer WiFi en mode AP
WiFi.mode(WIFI_AP);
WiFi.softAP(moduleName.c_str(), "12345678");
Serial.print("Mode OTA activé - AP WiFi: ");
Serial.print(moduleName);
Serial.println(" | Mot de passe: 12345678");
Serial.print("IP AP: ");
Serial.println(WiFi.softAPIP());
// Configurer ArduinoOTA
ArduinoOTA.setHostname(moduleName.c_str());
ArduinoOTA.onStart([]() {
Serial.println("Démarrage de la mise à jour OTA...");
otaMode = true;
});
ArduinoOTA.onEnd([]() {
Serial.println("\nMise à jour OTA terminée!");
led.setPixelColor(0, COLOR_GREEN);
led.show();
delay(2000);
});
ArduinoOTA.onProgress([](unsigned int progress, unsigned int total) {
Serial.printf("Progression: %u%%\n", (progress / (total / 100)));
});
ArduinoOTA.onError([](ota_error_t error) {
Serial.printf("Erreur OTA [%u]: ", error);
if (error == OTA_AUTH_ERROR) {
Serial.println("Erreur d'authentification");
} else if (error == OTA_BEGIN_ERROR) {
Serial.println("Erreur de début");
} else if (error == OTA_CONNECT_ERROR) {
Serial.println("Erreur de connexion");
} else if (error == OTA_RECEIVE_ERROR) {
Serial.println("Erreur de réception");
} else if (error == OTA_END_ERROR) {
Serial.println("Erreur de fin");
}
led.setPixelColor(0, COLOR_RED);
led.show();
});
ArduinoOTA.begin();
otaModeStartTime = millis();
otaMode = true; // Activer le mode OTA immédiatement pour afficher le clignotement bleu
}
class MyServerCallbacks: public BLEServerCallbacks {
void onConnect(BLEServer* pServer) {
deviceConnected = true;
@ -227,6 +289,52 @@ void loop() { @@ -227,6 +289,52 @@ void loop() {
// Détecter les changements d'état des GPIO (passage à LOW)
// Uniquement si connecté ET GPIO activés
uint8_t newPresses = currentStates & ~gpioStates;
// Détection appui simultané des 4 boutons (ABCD) pour mode OTA
if (currentStates == 0x0F) { // Les 4 boutons sont pressés (bits 0-3)
if (!allButtonsPressed) {
// Premier appui simultané détecté
allButtonsPressed = true;
allButtonsStartTime = millis();
Serial.println("Appui ABCD détecté - en attente de 5 secondes...");
} else if ((millis() - allButtonsStartTime) >= 5000 && !otaMode) {
// 5 secondes écoulées, activer mode OTA
Serial.println("5 secondes d'appui ABCD détectées - Mode OTA activé!");
setupOTA();
}
} else {
// Les 4 boutons ne sont plus tous pressés
if (allButtonsPressed) {
allButtonsPressed = false;
Serial.println("Appui ABCD relâché");
}
}
// Si en mode OTA, gérer les appels OTA et le timeout
if (otaMode) {
ArduinoOTA.handle();
// Clignotement bleu rapide pour indiquer le mode OTA
unsigned long currentTime = millis();
if (currentTime - lastBlinkTime >= 200) { // Clignotement rapide (200ms)
lastBlinkTime = currentTime;
blinkState = !blinkState;
if (blinkState) {
led.setPixelColor(0, COLOR_BLUE);
} else {
led.setPixelColor(0, COLOR_OFF);
}
led.show();
}
// Timeout 60 secondes si pas de connexion WiFi
if ((millis() - otaModeStartTime) >= 60000) {
Serial.println("Timeout mode OTA - Redémarrage du module...");
delay(500);
ESP.restart();
}
} else {
// Mode normal (non-OTA)
if (newPresses != 0 && deviceConnected && gpioEnabled) {
// Allumer la LED selon le GPIO pressé
if (newPresses & (1 << 0)) { // GPIO 2
@ -248,37 +356,38 @@ void loop() { @@ -248,37 +356,38 @@ void loop() {
}
}
// Créer un message JSON avec l'état des GPIO
String message = "{\"module\":\"" + moduleName + "\",\"gpios\":[";
for (int i = 0; i < NUM_PINS; i++) {
message += "{\"pin\":" + String(GPIO_PINS[i]) + ",\"state\":";
message += ((currentStates & (1 << i)) ? "1" : "0");
message += "}";
if (i < NUM_PINS - 1) message += ",";
}
message += "]}";
// Mettre à jour la valeur de la caractéristique
pCharacteristic->setValue(message.c_str());
// Créer un message JSON avec l'état des GPIO
String message = "{\"module\":\"" + moduleName + "\",\"gpios\":[";
for (int i = 0; i < NUM_PINS; i++) {
message += "{\"pin\":" + String(GPIO_PINS[i]) + ",\"state\":";
message += ((currentStates & (1 << i)) ? "1" : "0");
message += "}";
if (i < NUM_PINS - 1) message += ",";
}
message += "]}";
// Mettre à jour la valeur de la caractéristique
pCharacteristic->setValue(message.c_str());
// Si l'état a changé et qu'un client est connecté, envoyer notification
if (currentStates != gpioStates && deviceConnected) {
gpioStates = currentStates;
pCharacteristic->notify();
Serial.println("État changé: " + message);
}
// Si l'état a changé et qu'un client est connecté, envoyer notification
if (currentStates != gpioStates && deviceConnected) {
gpioStates = currentStates;
pCharacteristic->notify();
Serial.println("État changé: " + message);
}
// Gestion de la reconnexion
if (!deviceConnected && oldDeviceConnected) {
delay(500);
pServer->startAdvertising();
Serial.println("Redémarrage de l'advertising");
oldDeviceConnected = deviceConnected;
}
if (deviceConnected && !oldDeviceConnected) {
oldDeviceConnected = deviceConnected;
}
// Gestion de la reconnexion
if (!deviceConnected && oldDeviceConnected) {
delay(500);
pServer->startAdvertising();
Serial.println("Redémarrage de l'advertising");
oldDeviceConnected = deviceConnected;
}
if (deviceConnected && !oldDeviceConnected) {
oldDeviceConnected = deviceConnected;
}
} // Fin du else (mode normal)
delay(100); // Délai pour éviter une lecture trop fréquente
}

583
tools/espota.py

@ -0,0 +1,583 @@ @@ -0,0 +1,583 @@
#!/usr/bin/env python
#
# Original espota.py by Ivan Grokhotkov:
# https://gist.github.com/igrr/d35ab8446922179dc58c
#
# Modified since 2015-09-18 from Pascal Gollor (https://github.com/pgollor)
# Modified since 2015-11-09 from Hristo Gochkov (https://github.com/me-no-dev)
# Modified since 2016-01-03 from Matthew O'Gorman (https://githumb.com/mogorman)
# Modified since 2025-09-04 from Lucas Saavedra Vaz (https://github.com/lucasssvaz)
#
# This script will push an OTA update to the ESP
# use it like:
# python espota.py -i <ESP_IP_addr> -I <Host_IP_addr> -p <ESP_port> -P <Host_port> [-a password] -f <sketch.bin>
# Or to upload SPIFFS image:
# python espota.py -i <ESP_IP_addr> -I <Host_IP_addr> -p <ESP_port> -P <HOST_port> [-a password] -s -f <spiffs.bin>
#
# Changes
# 2015-09-18:
# - Add option parser.
# - Add logging.
# - Send command to controller to differ between flashing and transmitting SPIFFS image.
#
# Changes
# 2015-11-09:
# - Added digest authentication
# - Enhanced error tracking and reporting
#
# Changes
# 2016-01-03:
# - Added more options to parser.
#
# Changes
# 2023-05-22:
# - Replaced the deprecated optparse module with argparse.
# - Adjusted the code style to conform to PEP 8 guidelines.
# - Used with statement for file handling to ensure proper resource cleanup.
# - Incorporated exception handling to catch and handle potential errors.
# - Made variable names more descriptive for better readability.
# - Introduced constants for better code maintainability.
#
# Changes
# 2025-09-04:
# - Changed authentication to use PBKDF2-HMAC-SHA256 for challenge/response
#
# Changes
# 2025-09-18:
# - Fixed authentication when using old images with MD5 passwords
#
# Changes
# 2025-10-07:
# - Fixed authentication when images might use old MD5 hashes stored in the firmware
from __future__ import print_function
import socket
import sys
import os
import argparse
import logging
import hashlib
import random
# Commands
FLASH = 0
SPIFFS = 100
AUTH = 200
# Constants
PROGRESS_BAR_LENGTH = 60
# update_progress(): Displays or updates a console progress bar
def update_progress(progress):
if PROGRESS:
status = ""
if isinstance(progress, int):
progress = float(progress)
if not isinstance(progress, float):
progress = 0
status = "Error: progress var must be float\r\n"
if progress < 0:
progress = 0
status = "Halt...\r\n"
if progress >= 1:
progress = 1
status = "Done...\r\n"
block = int(round(PROGRESS_BAR_LENGTH * progress))
text = "\rUploading: [{0}] {1}% {2}".format(
"=" * block + " " * (PROGRESS_BAR_LENGTH - block), int(progress * 100), status
)
sys.stderr.write(text)
sys.stderr.flush()
else:
sys.stderr.write(".")
sys.stderr.flush()
def send_invitation_and_get_auth_challenge(remote_addr, remote_port, message):
"""
Send invitation to ESP device and get authentication challenge.
Returns (success, auth_data, error_message) tuple.
"""
remote_address = (remote_addr, int(remote_port))
inv_tries = 0
data = ""
msg = "Sending invitation to %s " % remote_addr
sys.stderr.write(msg)
sys.stderr.flush()
while inv_tries < 10:
inv_tries += 1
sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sent = sock2.sendto(message.encode(), remote_address) # noqa: F841
except: # noqa: E722
sys.stderr.write("failed\n")
sys.stderr.flush()
sock2.close()
return False, None, "Host %s Not Found" % remote_addr
sock2.settimeout(TIMEOUT)
try:
# Try to read up to 69 bytes for new protocol (SHA256)
# If device sends less (37 bytes), it's using old MD5 protocol
data = sock2.recv(69).decode()
sock2.close()
break
except: # noqa: E722
sys.stderr.write(".")
sys.stderr.flush()
sock2.close()
sys.stderr.write("\n")
sys.stderr.flush()
if inv_tries == 10:
return False, None, "No response from the ESP"
return True, data, None
def authenticate(
remote_addr, remote_port, password, use_md5_password, use_old_protocol, filename, content_size, file_md5, nonce
):
"""
Perform authentication with the ESP device.
Args:
use_md5_password: If True, hash password with MD5 instead of SHA256
use_old_protocol: If True, use old MD5 challenge/response protocol (pre-3.3.1)
Returns (success, error_message) tuple.
"""
cnonce_text = "%s%u%s%s" % (filename, content_size, file_md5, remote_addr)
remote_address = (remote_addr, int(remote_port))
if use_old_protocol:
# Generate client nonce (cnonce)
cnonce = hashlib.md5(cnonce_text.encode()).hexdigest()
# Old MD5 challenge/response protocol (pre-3.3.1)
# 1. Hash the password with MD5
password_hash = hashlib.md5(password.encode()).hexdigest()
# 2. Create challenge response
challenge = "%s:%s:%s" % (password_hash, nonce, cnonce)
response = hashlib.md5(challenge.encode()).hexdigest()
expected_response_length = 32
else:
# Generate client nonce (cnonce) using SHA256 for new protocol
cnonce = hashlib.sha256(cnonce_text.encode()).hexdigest()
# New PBKDF2-HMAC-SHA256 challenge/response protocol (3.3.1+)
# The password can be hashed with either MD5 or SHA256
if use_md5_password:
# Use MD5 for password hash (for devices that stored MD5 hashes)
password_hash = hashlib.md5(password.encode()).hexdigest()
else:
# Use SHA256 for password hash (recommended)
password_hash = hashlib.sha256(password.encode()).hexdigest()
# 2. Derive key using PBKDF2-HMAC-SHA256 with the password hash
salt = nonce + ":" + cnonce
derived_key = hashlib.pbkdf2_hmac("sha256", password_hash.encode(), salt.encode(), 10000)
derived_key_hex = derived_key.hex()
# 3. Create challenge response
challenge = derived_key_hex + ":" + nonce + ":" + cnonce
response = hashlib.sha256(challenge.encode()).hexdigest()
expected_response_length = 64
# Send authentication response
sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
message = "%d %s %s\n" % (AUTH, cnonce, response)
sock2.sendto(message.encode(), remote_address)
sock2.settimeout(10)
try:
data = sock2.recv(expected_response_length).decode()
except: # noqa: E722
sock2.close()
return False, "No Answer to our Authentication"
if data != "OK":
sock2.close()
return False, data
sock2.close()
return True, None
except Exception as e:
sock2.close()
return False, str(e)
def serve( # noqa: C901
remote_addr, local_addr, remote_port, local_port, password, md5_target, filename, command=FLASH
):
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = (local_addr, local_port)
logging.info("Starting on %s:%s", str(server_address[0]), str(server_address[1]))
try:
sock.bind(server_address)
sock.listen(1)
except Exception as e:
logging.error("Listen Failed: %s", str(e))
return 1
content_size = os.path.getsize(filename)
with open(filename, "rb") as f:
file_md5 = hashlib.md5(f.read()).hexdigest()
logging.info("Upload size: %d", content_size)
message = "%d %d %d %s\n" % (command, local_port, content_size, file_md5)
# Send invitation and get authentication challenge
success, data, error = send_invitation_and_get_auth_challenge(remote_addr, remote_port, message)
if not success:
logging.error(error)
return 1
if data != "OK":
if data.startswith("AUTH"):
nonce = data.split()[1]
nonce_length = len(nonce)
# Detect protocol version based on nonce length:
# - 32 chars = Old MD5 protocol (pre-3.3.1)
# - 64 chars = New SHA256 protocol (3.3.1+)
if nonce_length == 32:
# Scenario 1: Old device (pre-3.3.1) using MD5 protocol
logging.info("Detected old MD5 protocol (pre-3.3.1)")
sys.stderr.write("Authenticating (MD5 protocol)...")
sys.stderr.flush()
auth_success, auth_error = authenticate(
remote_addr,
remote_port,
password,
use_md5_password=True,
use_old_protocol=True,
filename=filename,
content_size=content_size,
file_md5=file_md5,
nonce=nonce,
)
if not auth_success:
sys.stderr.write("FAIL\n")
logging.error("Authentication Failed: %s", auth_error)
logging.error("Please check your password and try again")
return 1
sys.stderr.write("OK\n")
logging.warning("====================================================================")
logging.warning("WARNING: Device is using old MD5 authentication protocol (pre-3.3.1)")
logging.warning("Please update to ESP32 Arduino Core 3.3.1+ for improved security.")
logging.warning("======================================================================")
elif nonce_length == 64:
# New protocol (3.3.1+) - try SHA256 password first, then MD5 if it fails
# Scenario 2: Try SHA256 password hash first (recommended for new devices)
if md5_target:
# User explicitly requested MD5 password hash
logging.info("Using MD5 password hash as requested")
sys.stderr.write("Authenticating (SHA256 protocol with MD5 password)...")
sys.stderr.flush()
auth_success, auth_error = authenticate(
remote_addr,
remote_port,
password,
use_md5_password=True,
use_old_protocol=False,
filename=filename,
content_size=content_size,
file_md5=file_md5,
nonce=nonce,
)
if auth_success:
logging.warning("Using insecure MD5 hash for password due to legacy device support")
logging.warning("Please upgrade devices to ESP32 Arduino Core 3.3.1+ for improved security")
else:
# Try SHA256 password hash first
sys.stderr.write("Authenticating (PBKDF2-HMAC-SHA256)...\n")
sys.stderr.flush()
auth_success, auth_error = authenticate(
remote_addr,
remote_port,
password,
use_md5_password=False,
use_old_protocol=False,
filename=filename,
content_size=content_size,
file_md5=file_md5,
nonce=nonce,
)
# Scenario 3: If SHA256 fails, try MD5 password hash (for devices with stored MD5 passwords)
if not auth_success:
sys.stderr.write("FAIL\n")
logging.info("SHA256 password failed, trying MD5 password hash")
sys.stderr.write("Retrying with MD5 password...\n")
sys.stderr.flush()
# Device is back in OTA_IDLE after auth failure, need to send new invitation
success, data, error = send_invitation_and_get_auth_challenge(remote_addr, remote_port, message)
if not success:
sys.stderr.write("FAIL\n")
logging.error("Failed to get new challenge for MD5 retry: %s", error)
return 1
if not data.startswith("AUTH"):
sys.stderr.write("FAIL\n")
logging.error("Expected AUTH challenge for MD5 retry, got: %s", data)
return 1
# Get new nonce for second attempt
nonce = data.split()[1]
sys.stderr.write("Authenticating (MD5)...\n")
sys.stderr.flush()
auth_success, auth_error = authenticate(
remote_addr,
remote_port,
password,
use_md5_password=True,
use_old_protocol=False,
filename=filename,
content_size=content_size,
file_md5=file_md5,
nonce=nonce,
)
if auth_success:
logging.warning("====================================================================")
logging.warning("WARNING: Device authenticated with MD5 password hash (deprecated)")
logging.warning("MD5 is cryptographically broken and should not be used.")
logging.warning(
"Please update your sketch to use either setPassword() or setPasswordHash()"
)
logging.warning(
"with SHA256, then upload again to migrate to the new secure SHA256 protocol."
)
logging.warning("======================================================================")
if not auth_success:
sys.stderr.write("FAIL\n")
logging.error("Authentication Failed: %s", auth_error)
logging.error("Please check your password and try again")
return 1
sys.stderr.write("OK\n")
else:
logging.error("Invalid nonce length: %d (expected 32 or 64)", nonce_length)
return 1
else:
logging.error("Bad Answer: %s", data)
return 1
logging.info("Waiting for device...")
try:
sock.settimeout(10)
connection, client_address = sock.accept()
sock.settimeout(None)
connection.settimeout(None)
except: # noqa: E722
logging.error("No response from device")
sock.close()
return 1
try:
with open(filename, "rb") as f:
if PROGRESS:
update_progress(0)
else:
sys.stderr.write("Uploading")
sys.stderr.flush()
offset = 0
while True:
chunk = f.read(1024)
if not chunk:
break
offset += len(chunk)
update_progress(offset / float(content_size))
connection.settimeout(10)
try:
connection.sendall(chunk)
res = connection.recv(10)
response_text = res.decode().strip()
last_response_contained_ok = "OK" in response_text
logging.debug("Chunk response: '%s'", response_text)
except Exception as e:
sys.stderr.write("\n")
logging.error("Error Uploading: %s", str(e))
connection.close()
return 1
if last_response_contained_ok:
logging.info("Success")
connection.close()
return 0
sys.stderr.write("\n")
logging.info("Waiting for result...")
count = 0
received_any_response = False
while count < 10: # Increased from 5 to 10 attempts
count += 1
connection.settimeout(30) # Reduced from 60s to 30s per attempt
try:
data = connection.recv(32).decode().strip()
received_any_response = True
logging.info("Result attempt %d: '%s'", count, data)
if "OK" in data:
logging.info("Success")
connection.close()
return 0
elif data: # Got some response but not OK
logging.warning("Unexpected response from device: '%s'", data)
except socket.timeout:
logging.debug("Timeout waiting for result (attempt %d/10)", count)
continue
except Exception as e:
logging.debug("Error receiving result (attempt %d/10): %s", count, str(e))
# Don't return error here, continue trying
continue
# After all attempts, provide detailed error information
if received_any_response:
logging.warning(
"Upload completed but device sent unexpected response(s). This may still be successful."
)
logging.warning("Device might be rebooting to apply firmware - this is normal.")
connection.close()
return 0 # Consider it successful if we got any response and upload completed
else:
logging.error("No response from device after upload completion")
logging.error("This could indicate device reboot (normal) or network issues")
connection.close()
return 1
except Exception as e: # noqa: E722
logging.error("Error: %s", str(e))
finally:
connection.close()
sock.close()
return 1
def parse_args(unparsed_args):
parser = argparse.ArgumentParser(description="Transmit image over the air to the ESP32 module with OTA support.")
# destination ip and port
parser.add_argument("-i", "--ip", dest="esp_ip", action="store", help="ESP32 IP Address.", default=False)
parser.add_argument("-I", "--host_ip", dest="host_ip", action="store", help="Host IP Address.", default="0.0.0.0")
parser.add_argument("-p", "--port", dest="esp_port", type=int, help="ESP32 OTA Port. Default: 3232", default=3232)
parser.add_argument(
"-P",
"--host_port",
dest="host_port",
type=int,
help="Host server OTA Port. Default: random 10000-60000",
default=random.randint(10000, 60000),
)
# authentication
parser.add_argument("-a", "--auth", dest="auth", help="Set authentication password.", action="store", default="")
parser.add_argument(
"-m",
"--md5-target",
dest="md5_target",
help=(
"Use MD5 for password hashing (for devices with stored MD5 passwords). "
"By default, SHA256 is tried first, then MD5 as fallback."
),
action="store_true",
default=False,
)
# image
parser.add_argument("-f", "--file", dest="image", help="Image file.", metavar="FILE", default=None)
parser.add_argument(
"-s",
"--spiffs",
dest="spiffs",
action="store_true",
help="Transmit a SPIFFS image and do not flash the module.",
default=False,
)
# output
parser.add_argument(
"-d",
"--debug",
dest="debug",
action="store_true",
help="Show debug output. Overrides loglevel with debug.",
default=False,
)
parser.add_argument(
"-r",
"--progress",
dest="progress",
action="store_true",
help="Show progress output. Does not work for Arduino IDE.",
default=False,
)
parser.add_argument(
"-t",
"--timeout",
dest="timeout",
type=int,
help="Timeout to wait for the ESP32 to accept invitation.",
default=10,
)
return parser.parse_args(unparsed_args)
def main(args):
options = parse_args(args)
log_level = logging.WARNING
if options.debug:
log_level = logging.DEBUG
logging.basicConfig(level=log_level, format="%(asctime)-8s [%(levelname)s]: %(message)s", datefmt="%H:%M:%S")
logging.debug("Options: %s", str(options))
# check options
global PROGRESS
PROGRESS = options.progress
global TIMEOUT
TIMEOUT = options.timeout
if not options.esp_ip or not options.image:
logging.critical("Not enough arguments.")
return 1
command = FLASH
if options.spiffs:
command = SPIFFS
return serve(
options.esp_ip,
options.host_ip,
options.esp_port,
options.host_port,
options.auth,
options.md5_target,
options.image,
command,
)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
Loading…
Cancel
Save