diff --git a/peripheral/main.py b/peripheral/main.py index 97a24ae..b77137e 100644 --- a/peripheral/main.py +++ b/peripheral/main.py @@ -2,14 +2,14 @@ import utime import ujson import network import urequests -import machine from machine import Pin -threshold = 15 # cm -sound_speed = 340 * 100 # m/s * centi = cm/s +THRESHOLD_DISTANCE = 15 # cm +SOUND_SPEED = 340 * 100 # m/s * centi = cm/s +MAX_CONNECTION_RETRIES = 50 +ULTRA_OPENED_THRESHOLD = 3 -# TODO: add if open for more than 1 minute, send alert class App: ssid: str password: str @@ -19,7 +19,6 @@ class App: opened: bool = False previously_opened: bool = False ultra_opened_counter: int = 0 - ultra_opened_threshold: int = 3 led = Pin(15, Pin.OUT) trigger = Pin(2, Pin.OUT) @@ -27,34 +26,39 @@ class App: def __init__(self): print("Loading .env.json...") - with open(".env.json") as stream: - data = ujson.load(stream) + self.load_env() - network = data.get("network") - if network is None: - print("'network' missing in env") - exit(1) + def load_env(self): + try: + with open(".env.json") as stream: + data = ujson.load(stream) - server = data.get("server") - if server is None: - print("'server' missing in env") - exit(1) + network = data.get("network") + if network is None: + print("'network' missing in env") + exit(1) - self.ssid = network.get("ssid") - self.password = network.get("password") - self.server = server.get("address") - self.token = server.get("token") + server = data.get("server") + if server is None: + print("'server' missing in env") + exit(1) - if self.ssid is None or \ - self.password is None or \ - self.server is None or \ - self.token is None: - print("missing properties in env") - exit(1) + self.ssid = network.get("ssid") + self.password = network.get("password") + self.server = server.get("address") + self.token = server.get("token") + + if None in (self.ssid, self.password, self.server, self.token): + print("Missing properties in env") + exit(1) + except Exception as e: + print(f"Error loading .env.json: {e}") + exit(1) def connect(self): - print("Connecting to wifi...") + print("Connecting to Wi-Fi...") wlan = network.WLAN(network.STA_IF) + wlan.active(False) utime.sleep_ms(250) wlan.active(True) @@ -63,88 +67,89 @@ class App: wlan.connect(self.ssid, self.password) utime.sleep_ms(100) - i = 0 + retry_count = 0 while not wlan.isconnected(): - print(f"Waiting for connection{(i % 3 + 1) * "."}{3 * " "}", end="\r") - i += 1 - for _ in range(4): - self.led.toggle() - utime.sleep_ms(250) + if retry_count >= MAX_CONNECTION_RETRIES: + print("Max connection retries reached") + exit(1) - if i % 10 == 0: - print("Attempting to restart connection...") - wlan.connect(self.ssid, self.password) - for i in range(10): - self.led.toggle() - utime.sleep_ms(50) + print(f"Waiting for connection{(retry_count % 3 + 1) * '.'}{3 * ' '}", end="\r") + retry_count += 1 + + for _ in range(4): + self.led.toggle() + utime.sleep_ms(250) + + if retry_count % 10 == 0: + print("Attempting to restart connection...") + wlan.connect(self.ssid, self.password) + for _ in range(10): + self.led.toggle() + utime.sleep_ms(50) print(f"Connected with IP {wlan.ifconfig()[0]}") self.update_server() def update_server(self): print("Updating state...", end="\r") - data = {"opened": self.opened} raw = ujson.dumps(data) + try: - r = urequests.post(f"{self.server}/open", - headers={"Authorization": self.token, "Content-Type": "application/json"}, - data=raw) + r = urequests.post( + f"{self.server}/open", + headers={"Authorization": self.token, "Content-Type": "application/json"}, + data=raw + ) print(f"State updated [{r.status_code}]") except Exception as e: print(f"Error occurred: {e}") - def ultra(self): + def measure_distance(self): self.trigger.low() utime.sleep_us(2) self.trigger.high() utime.sleep_us(10) self.trigger.low() - # echo goes high as soon as sound wave is sent - # and returns to low once sound gets back sent_time = utime.ticks_us() while self.echo.value() == 0: - sent_time = utime.ticks_us() # us + sent_time = utime.ticks_us() received_time = utime.ticks_us() while self.echo.value() == 1: - received_time = utime.ticks_us() # us + received_time = utime.ticks_us() - delta_time = received_time - sent_time # us - distance = delta_time / 1_000_000 * sound_speed / 2 - print(f"distance: {distance} cm") + delta_time = received_time - sent_time + distance = delta_time / 1_000_000 * SOUND_SPEED / 2 + print(f"Distance: {distance} cm") - if distance < threshold: - # door closed - self.led.low() - self.ultra_opened_counter = 0 - - # was it closed just now? - if self.previously_opened: - self.update_server() - self.previously_opened = False - else: - # door opened - self.led.high() - self.ultra_opened_counter += 1 - - # was it opened just now? - if not self.previously_opened: - self.update_server() - self.previously_opened = True + return distance def run(self): print("Starting door alarm...") self.connect() i = 0 - # T = 500ms f = 2 Hz while True: try: - self.ultra() + distance = self.measure_distance() + self.opened = distance >= THRESHOLD_DISTANCE - if i >= 20: # every 10 seconds, blink for 100ms + if not opened: + self.led.low() + self.ultra_opened_counter = 0 + + if self.previously_opened: + self.update_server() + else: + self.led.high() + self.ultra_opened_counter += 1 + + if self.ultra_opened_counter >= ULTRA_OPENED_THRESHOLD: + self.update_server() + + if i >= 20: self.led.toggle() utime.sleep_ms(100) self.led.toggle() @@ -152,11 +157,12 @@ class App: i = 0 else: utime.sleep_ms(500) + except Exception as e: print(f"Fatal exception occurred: {e}") + self.previously_opened = self.opened i += 1 - if __name__ == "__main__": App().run()