🔨 Refactors code
All checks were successful
Gitea Build Action / build (push) Successful in 22s

This commit is contained in:
Daniel Svitan 2025-06-01 12:06:01 +02:00
parent fe8cfceab2
commit 329253b1f4

View File

@ -2,14 +2,14 @@ import utime
import ujson import ujson
import network import network
import urequests import urequests
import machine
from machine import Pin from machine import Pin
threshold = 15 # cm THRESHOLD_DISTANCE = 15 # cm
sound_speed = 340 * 100 # m/s * centi = cm/s SOUND_SPEED = 340 * 100 # m/s * centi = cm/s
MAX_CONNECTION_RETRIES = 50
ULTRA_OPENED_THRESHOLD = 3
# TODO: add if open for more than 1 minute, send alert
class App: class App:
ssid: str ssid: str
password: str password: str
@ -19,7 +19,6 @@ class App:
opened: bool = False opened: bool = False
previously_opened: bool = False previously_opened: bool = False
ultra_opened_counter: int = 0 ultra_opened_counter: int = 0
ultra_opened_threshold: int = 3
led = Pin(15, Pin.OUT) led = Pin(15, Pin.OUT)
trigger = Pin(2, Pin.OUT) trigger = Pin(2, Pin.OUT)
@ -27,6 +26,10 @@ class App:
def __init__(self): def __init__(self):
print("Loading .env.json...") print("Loading .env.json...")
self.load_env()
def load_env(self):
try:
with open(".env.json") as stream: with open(".env.json") as stream:
data = ujson.load(stream) data = ujson.load(stream)
@ -45,16 +48,17 @@ class App:
self.server = server.get("address") self.server = server.get("address")
self.token = server.get("token") self.token = server.get("token")
if self.ssid is None or \ if None in (self.ssid, self.password, self.server, self.token):
self.password is None or \ print("Missing properties in env")
self.server is None or \ exit(1)
self.token is None: except Exception as e:
print("missing properties in env") print(f"Error loading .env.json: {e}")
exit(1) exit(1)
def connect(self): def connect(self):
print("Connecting to wifi...") print("Connecting to Wi-Fi...")
wlan = network.WLAN(network.STA_IF) wlan = network.WLAN(network.STA_IF)
wlan.active(False) wlan.active(False)
utime.sleep_ms(250) utime.sleep_ms(250)
wlan.active(True) wlan.active(True)
@ -63,18 +67,23 @@ class App:
wlan.connect(self.ssid, self.password) wlan.connect(self.ssid, self.password)
utime.sleep_ms(100) utime.sleep_ms(100)
i = 0 retry_count = 0
while not wlan.isconnected(): while not wlan.isconnected():
print(f"Waiting for connection{(i % 3 + 1) * "."}{3 * " "}", end="\r") if retry_count >= MAX_CONNECTION_RETRIES:
i += 1 print("Max connection retries reached")
exit(1)
print(f"Waiting for connection{(retry_count % 3 + 1) * '.'}{3 * ' '}", end="\r")
retry_count += 1
for _ in range(4): for _ in range(4):
self.led.toggle() self.led.toggle()
utime.sleep_ms(250) utime.sleep_ms(250)
if i % 10 == 0: if retry_count % 10 == 0:
print("Attempting to restart connection...") print("Attempting to restart connection...")
wlan.connect(self.ssid, self.password) wlan.connect(self.ssid, self.password)
for i in range(10): for _ in range(10):
self.led.toggle() self.led.toggle()
utime.sleep_ms(50) utime.sleep_ms(50)
@ -83,68 +92,64 @@ class App:
def update_server(self): def update_server(self):
print("Updating state...", end="\r") print("Updating state...", end="\r")
data = {"opened": self.opened} data = {"opened": self.opened}
raw = ujson.dumps(data) raw = ujson.dumps(data)
try: try:
r = urequests.post(f"{self.server}/open", r = urequests.post(
f"{self.server}/open",
headers={"Authorization": self.token, "Content-Type": "application/json"}, headers={"Authorization": self.token, "Content-Type": "application/json"},
data=raw) data=raw
)
print(f"State updated [{r.status_code}]") print(f"State updated [{r.status_code}]")
except Exception as e: except Exception as e:
print(f"Error occurred: {e}") print(f"Error occurred: {e}")
def ultra(self): def measure_distance(self):
self.trigger.low() self.trigger.low()
utime.sleep_us(2) utime.sleep_us(2)
self.trigger.high() self.trigger.high()
utime.sleep_us(10) utime.sleep_us(10)
self.trigger.low() self.trigger.low()
# echo goes high as soon as sound wave is sent
# and returns to low once sound gets back
sent_time = utime.ticks_us() sent_time = utime.ticks_us()
while self.echo.value() == 0: while self.echo.value() == 0:
sent_time = utime.ticks_us() # us sent_time = utime.ticks_us()
received_time = utime.ticks_us() received_time = utime.ticks_us()
while self.echo.value() == 1: while self.echo.value() == 1:
received_time = utime.ticks_us() # us received_time = utime.ticks_us()
delta_time = received_time - sent_time # us delta_time = received_time - sent_time
distance = delta_time / 1_000_000 * sound_speed / 2 distance = delta_time / 1_000_000 * SOUND_SPEED / 2
print(f"distance: {distance} cm") print(f"Distance: {distance} cm")
if distance < threshold: return distance
# door closed
self.led.low()
self.ultra_opened_counter = 0
# was it closed just now?
if self.previously_opened:
self.update_server()
self.previously_opened = False
else:
# door opened
self.led.high()
self.ultra_opened_counter += 1
# was it opened just now?
if not self.previously_opened:
self.update_server()
self.previously_opened = True
def run(self): def run(self):
print("Starting door alarm...") print("Starting door alarm...")
self.connect() self.connect()
i = 0 i = 0
# T = 500ms f = 2 Hz
while True: while True:
try: try:
self.ultra() distance = self.measure_distance()
self.opened = distance >= THRESHOLD_DISTANCE
if i >= 20: # every 10 seconds, blink for 100ms if not opened:
self.led.low()
self.ultra_opened_counter = 0
if self.previously_opened:
self.update_server()
else:
self.led.high()
self.ultra_opened_counter += 1
if self.ultra_opened_counter >= ULTRA_OPENED_THRESHOLD:
self.update_server()
if i >= 20:
self.led.toggle() self.led.toggle()
utime.sleep_ms(100) utime.sleep_ms(100)
self.led.toggle() self.led.toggle()
@ -152,11 +157,12 @@ class App:
i = 0 i = 0
else: else:
utime.sleep_ms(500) utime.sleep_ms(500)
except Exception as e: except Exception as e:
print(f"Fatal exception occurred: {e}") print(f"Fatal exception occurred: {e}")
self.previously_opened = self.opened
i += 1 i += 1
if __name__ == "__main__": if __name__ == "__main__":
App().run() App().run()