Daniel Svitan 2f3ccf14db
All checks were successful
Gitea Build Action / build (push) Successful in 29s
🐛 Fixes cannot update state
2025-06-01 12:30:42 +02:00

179 lines
5.0 KiB
Python

import utime
import ujson
import network
import urequests
from machine import Pin
THRESHOLD_DISTANCE = 15 # cm
SOUND_SPEED = 340 * 100 # m/s * centi = cm/s
MAX_CONNECTION_RETRIES = 50
ULTRA_OPENED_THRESHOLD = 3
class App:
ssid: str
password: str
server: str
token: str
opened: bool = False
previously_opened: bool = False
ultra_opened_counter: int = 0
led = Pin(15, Pin.OUT)
trigger = Pin(2, Pin.OUT)
echo = Pin(3, Pin.IN)
def __init__(self):
print("Loading .env.json...")
self.load_env()
def load_env(self):
try:
with open(".env.json") as stream:
data = ujson.load(stream)
network = data.get("network")
if network is None:
print("'network' missing in env")
exit(1)
server = data.get("server")
if server is None:
print("'server' missing in env")
exit(1)
self.ssid = network.get("ssid")
self.password = network.get("password")
self.server = server.get("address")
self.token = server.get("token")
if None in (self.ssid, self.password, self.server, self.token):
print("Missing properties in env")
exit(1)
except Exception as e:
print(f"Error loading .env.json: {e}")
exit(1)
def connect(self):
print("Connecting to Wi-Fi...")
wlan = network.WLAN(network.STA_IF)
wlan.active(False)
utime.sleep_ms(250)
wlan.active(True)
utime.sleep_ms(250)
wlan.connect(self.ssid, self.password)
utime.sleep_ms(100)
retry_count = 0
while not wlan.isconnected():
if retry_count >= MAX_CONNECTION_RETRIES:
print("Max connection retries reached")
exit(1)
print(f"Waiting for connection{(retry_count % 3 + 1) * '.'}{3 * ' '}", end="\r")
retry_count += 1
for _ in range(4):
self.led.toggle()
utime.sleep_ms(250)
if retry_count % 10 == 0:
print("Attempting to restart connection...")
wlan.connect(self.ssid, self.password)
for _ in range(10):
self.led.toggle()
utime.sleep_ms(50)
print(f"Connected with IP {wlan.ifconfig()[0]}")
self.update_server()
def health_check_server(self):
print("Health checking server...", end="\r")
try:
r = urequests.get(f"{self.server}/")
print(f"Server healthy [{r.status_code}]{" " * 8}")
except Exception as e:
print(f"Error occurred: {e}")
def update_server(self):
print("Updating state...", end="\r")
data = {"opened": self.opened}
raw = ujson.dumps(data)
try:
r = urequests.post(
f"{self.server}/open",
headers={"Authorization": self.token, "Content-Type": "application/json"},
data=raw
)
print(f"State updated [{r.status_code}] {r.content.decode()}")
except Exception as e:
print(f"Error occurred: {e}")
def measure_distance(self):
self.trigger.low()
utime.sleep_us(2)
self.trigger.high()
utime.sleep_us(10)
self.trigger.low()
sent_time = utime.ticks_us()
while self.echo.value() == 0:
sent_time = utime.ticks_us()
received_time = utime.ticks_us()
while self.echo.value() == 1:
received_time = utime.ticks_us()
delta_time = received_time - sent_time
distance = delta_time / 1_000_000 * SOUND_SPEED / 2
print(f"Distance: {distance} cm")
return distance
def run(self):
print("Starting door alarm...")
self.connect()
i = 0
while True:
try:
distance = self.measure_distance()
self.opened = distance >= THRESHOLD_DISTANCE
if not self.opened:
self.led.low()
self.ultra_opened_counter = 0
if self.previously_opened:
self.update_server()
else:
self.led.high()
self.ultra_opened_counter += 1
if self.ultra_opened_counter >= ULTRA_OPENED_THRESHOLD:
self.update_server()
if i >= 20:
self.led.toggle()
utime.sleep_ms(100)
self.led.toggle()
utime.sleep_ms(400)
i = 0
self.health_check_server()
else:
utime.sleep_ms(500)
except Exception as e:
print(f"Fatal exception occurred: {e}")
self.previously_opened = self.opened
i += 1
if __name__ == "__main__":
App().run()