From fe8cfceab23e12bd2a9c76c097fcb1d6053bed42 Mon Sep 17 00:00:00 2001 From: Daniel Svitan Date: Sun, 1 Jun 2025 11:30:37 +0200 Subject: [PATCH] :hammer: Moves to an Object Oriented structure --- peripheral/.gitignore | 1 + peripheral/main.py | 249 ++++++++++++++++++++++++------------------ 2 files changed, 141 insertions(+), 109 deletions(-) create mode 100644 peripheral/.gitignore diff --git a/peripheral/.gitignore b/peripheral/.gitignore new file mode 100644 index 0000000..b7869dd --- /dev/null +++ b/peripheral/.gitignore @@ -0,0 +1 @@ +.env.json diff --git a/peripheral/main.py b/peripheral/main.py index 8e48c16..97a24ae 100644 --- a/peripheral/main.py +++ b/peripheral/main.py @@ -5,127 +5,158 @@ import urequests import machine from machine import Pin -led = Pin(15, Pin.OUT) -trigger = Pin(2, Pin.OUT) -echo = Pin(3, Pin.IN) - -sound = 340 * 100 # m/s * centi = cm/s threshold = 15 # cm +sound_speed = 340 * 100 # m/s * centi = cm/s -print("Starting door alarm...") -ssid = "HUAWEI-2EEt-2G" -password = "hxtU2dvx" -server = "https://192.168.100.16:5031" -token = "" - -ultra_counter = 0 -ultra_threshold = 3 -# opened state from previous iteration -previous_opened = False # TODO: add if open for more than 1 minute, send alert +class App: + ssid: str + password: str + server: str + token: str + opened: bool = False + previously_opened: bool = False + ultra_opened_counter: int = 0 + ultra_opened_threshold: int = 3 -def connect(): - print("Connecting...") - wlan = network.WLAN(network.STA_IF) - wlan.active(False) - utime.sleep_ms(250) - wlan.active(True) - utime.sleep_ms(250) + led = Pin(15, Pin.OUT) + trigger = Pin(2, Pin.OUT) + echo = Pin(3, Pin.IN) - wlan.connect(ssid, password) - utime.sleep_ms(100) + def __init__(self): + print("Loading .env.json...") + with open(".env.json") as stream: + data = ujson.load(stream) - i = 0 - while not wlan.isconnected(): - print(f"Waiting for connection{(i % 3 + 1) * "."}{3 * " "}", end="\r") - i += 1 - for _ in range(4): - led.toggle() - utime.sleep_ms(250) + network = data.get("network") + if network is None: + print("'network' missing in env") + exit(1) - if i % 10 == 0: - print("Attempting to restart connection...") - wlan.connect(ssid, password) - for i in range(10): - led.toggle() - utime.sleep_ms(50) + server = data.get("server") + if server is None: + print("'server' missing in env") + exit(1) - print(f"Connected with IP {wlan.ifconfig()[0]}") - # update server to default value - send_req(False) + self.ssid = network.get("ssid") + self.password = network.get("password") + self.server = server.get("address") + self.token = server.get("token") + if self.ssid is None or \ + self.password is None or \ + self.server is None or \ + self.token is None: + print("missing properties in env") + exit(1) -def send_req(opened: bool): - print("Updating state...", end="\r") + def connect(self): + print("Connecting to wifi...") + wlan = network.WLAN(network.STA_IF) + wlan.active(False) + utime.sleep_ms(250) + wlan.active(True) + utime.sleep_ms(250) - data = {"opened": opened} - raw = ujson.dumps(data) - try: - r = urequests.post(f"{server}/write", - headers={"Authorization": token, "Content-Type": "application/json"}, - data=raw) - print(f"State updated [{r.status_code}]") - except Exception as e: - print(f"Error occurred: {e}") - - -def ultra(): - global ultra_counter, previous_opened - - trigger.low() - utime.sleep_us(2) - trigger.high() - utime.sleep_us(10) - trigger.low() - - # echo goes high as soon as sound wave is sent - # and returns to low once sound gets back - while echo.value() == 0: - sent_time = utime.ticks_us() # us - - while echo.value() == 1: - received_time = utime.ticks_us() # us - - delta_time = received_time - sent_time # us - distance = delta_time / 1_000_000 * sound / 2 - print(f"distance: {distance} cm") - - if distance < threshold: - # door closed - led.low() - ultra_counter = 0 - - # was it closed just now? - if previous_opened: - send_req(False) - previous_opened = False - else: - # door opened - led.high() - ultra_counter += 1 - - # was it opened just now? - if not previous_opened: - send_req(True) - previous_opened = True - - -connect() - -i = 0 -# T = 500ms f = 2 Hz -while True: - ultra() - - if i == 20: # every 10 seconds, blink for 100ms - led.toggle() + wlan.connect(self.ssid, self.password) utime.sleep_ms(100) - led.toggle() - utime.sleep_ms(400) - i = 0 - else: - utime.sleep_ms(500) - i += 1 + i = 0 + while not wlan.isconnected(): + print(f"Waiting for connection{(i % 3 + 1) * "."}{3 * " "}", end="\r") + i += 1 + for _ in range(4): + self.led.toggle() + utime.sleep_ms(250) + + if i % 10 == 0: + print("Attempting to restart connection...") + wlan.connect(self.ssid, self.password) + for i in range(10): + self.led.toggle() + utime.sleep_ms(50) + + print(f"Connected with IP {wlan.ifconfig()[0]}") + self.update_server() + + def update_server(self): + print("Updating state...", end="\r") + + data = {"opened": self.opened} + raw = ujson.dumps(data) + try: + r = urequests.post(f"{self.server}/open", + headers={"Authorization": self.token, "Content-Type": "application/json"}, + data=raw) + print(f"State updated [{r.status_code}]") + except Exception as e: + print(f"Error occurred: {e}") + + def ultra(self): + self.trigger.low() + utime.sleep_us(2) + self.trigger.high() + utime.sleep_us(10) + self.trigger.low() + + # echo goes high as soon as sound wave is sent + # and returns to low once sound gets back + sent_time = utime.ticks_us() + while self.echo.value() == 0: + sent_time = utime.ticks_us() # us + + received_time = utime.ticks_us() + while self.echo.value() == 1: + received_time = utime.ticks_us() # us + + delta_time = received_time - sent_time # us + distance = delta_time / 1_000_000 * sound_speed / 2 + print(f"distance: {distance} cm") + + if distance < threshold: + # door closed + self.led.low() + self.ultra_opened_counter = 0 + + # was it closed just now? + if self.previously_opened: + self.update_server() + self.previously_opened = False + else: + # door opened + self.led.high() + self.ultra_opened_counter += 1 + + # was it opened just now? + if not self.previously_opened: + self.update_server() + self.previously_opened = True + + def run(self): + print("Starting door alarm...") + self.connect() + + i = 0 + # T = 500ms f = 2 Hz + while True: + try: + self.ultra() + + if i >= 20: # every 10 seconds, blink for 100ms + self.led.toggle() + utime.sleep_ms(100) + self.led.toggle() + utime.sleep_ms(400) + i = 0 + else: + utime.sleep_ms(500) + except Exception as e: + print(f"Fatal exception occurred: {e}") + + i += 1 + + +if __name__ == "__main__": + App().run()