🔨 Moves to an Object Oriented structure
All checks were successful
Gitea Build Action / build (push) Successful in 24s

This commit is contained in:
Daniel Svitan 2025-06-01 11:30:37 +02:00
parent 96abe93c3e
commit fe8cfceab2
2 changed files with 141 additions and 109 deletions

1
peripheral/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.env.json

View File

@ -5,127 +5,158 @@ import urequests
import machine
from machine import Pin
led = Pin(15, Pin.OUT)
trigger = Pin(2, Pin.OUT)
echo = Pin(3, Pin.IN)
sound = 340 * 100 # m/s * centi = cm/s
threshold = 15 # cm
sound_speed = 340 * 100 # m/s * centi = cm/s
print("Starting door alarm...")
ssid = "HUAWEI-2EEt-2G"
password = "hxtU2dvx"
server = "https://192.168.100.16:5031"
token = ""
ultra_counter = 0
ultra_threshold = 3
# opened state from previous iteration
previous_opened = False
# TODO: add if open for more than 1 minute, send alert
class App:
ssid: str
password: str
server: str
token: str
opened: bool = False
previously_opened: bool = False
ultra_opened_counter: int = 0
ultra_opened_threshold: int = 3
def connect():
print("Connecting...")
wlan = network.WLAN(network.STA_IF)
wlan.active(False)
utime.sleep_ms(250)
wlan.active(True)
utime.sleep_ms(250)
led = Pin(15, Pin.OUT)
trigger = Pin(2, Pin.OUT)
echo = Pin(3, Pin.IN)
wlan.connect(ssid, password)
utime.sleep_ms(100)
def __init__(self):
print("Loading .env.json...")
with open(".env.json") as stream:
data = ujson.load(stream)
i = 0
while not wlan.isconnected():
print(f"Waiting for connection{(i % 3 + 1) * "."}{3 * " "}", end="\r")
i += 1
for _ in range(4):
led.toggle()
utime.sleep_ms(250)
network = data.get("network")
if network is None:
print("'network' missing in env")
exit(1)
if i % 10 == 0:
print("Attempting to restart connection...")
wlan.connect(ssid, password)
for i in range(10):
led.toggle()
utime.sleep_ms(50)
server = data.get("server")
if server is None:
print("'server' missing in env")
exit(1)
print(f"Connected with IP {wlan.ifconfig()[0]}")
# update server to default value
send_req(False)
self.ssid = network.get("ssid")
self.password = network.get("password")
self.server = server.get("address")
self.token = server.get("token")
if self.ssid is None or \
self.password is None or \
self.server is None or \
self.token is None:
print("missing properties in env")
exit(1)
def send_req(opened: bool):
print("Updating state...", end="\r")
def connect(self):
print("Connecting to wifi...")
wlan = network.WLAN(network.STA_IF)
wlan.active(False)
utime.sleep_ms(250)
wlan.active(True)
utime.sleep_ms(250)
data = {"opened": opened}
raw = ujson.dumps(data)
try:
r = urequests.post(f"{server}/write",
headers={"Authorization": token, "Content-Type": "application/json"},
data=raw)
print(f"State updated [{r.status_code}]")
except Exception as e:
print(f"Error occurred: {e}")
def ultra():
global ultra_counter, previous_opened
trigger.low()
utime.sleep_us(2)
trigger.high()
utime.sleep_us(10)
trigger.low()
# echo goes high as soon as sound wave is sent
# and returns to low once sound gets back
while echo.value() == 0:
sent_time = utime.ticks_us() # us
while echo.value() == 1:
received_time = utime.ticks_us() # us
delta_time = received_time - sent_time # us
distance = delta_time / 1_000_000 * sound / 2
print(f"distance: {distance} cm")
if distance < threshold:
# door closed
led.low()
ultra_counter = 0
# was it closed just now?
if previous_opened:
send_req(False)
previous_opened = False
else:
# door opened
led.high()
ultra_counter += 1
# was it opened just now?
if not previous_opened:
send_req(True)
previous_opened = True
connect()
i = 0
# T = 500ms f = 2 Hz
while True:
ultra()
if i == 20: # every 10 seconds, blink for 100ms
led.toggle()
wlan.connect(self.ssid, self.password)
utime.sleep_ms(100)
led.toggle()
utime.sleep_ms(400)
i = 0
else:
utime.sleep_ms(500)
i += 1
i = 0
while not wlan.isconnected():
print(f"Waiting for connection{(i % 3 + 1) * "."}{3 * " "}", end="\r")
i += 1
for _ in range(4):
self.led.toggle()
utime.sleep_ms(250)
if i % 10 == 0:
print("Attempting to restart connection...")
wlan.connect(self.ssid, self.password)
for i in range(10):
self.led.toggle()
utime.sleep_ms(50)
print(f"Connected with IP {wlan.ifconfig()[0]}")
self.update_server()
def update_server(self):
print("Updating state...", end="\r")
data = {"opened": self.opened}
raw = ujson.dumps(data)
try:
r = urequests.post(f"{self.server}/open",
headers={"Authorization": self.token, "Content-Type": "application/json"},
data=raw)
print(f"State updated [{r.status_code}]")
except Exception as e:
print(f"Error occurred: {e}")
def ultra(self):
self.trigger.low()
utime.sleep_us(2)
self.trigger.high()
utime.sleep_us(10)
self.trigger.low()
# echo goes high as soon as sound wave is sent
# and returns to low once sound gets back
sent_time = utime.ticks_us()
while self.echo.value() == 0:
sent_time = utime.ticks_us() # us
received_time = utime.ticks_us()
while self.echo.value() == 1:
received_time = utime.ticks_us() # us
delta_time = received_time - sent_time # us
distance = delta_time / 1_000_000 * sound_speed / 2
print(f"distance: {distance} cm")
if distance < threshold:
# door closed
self.led.low()
self.ultra_opened_counter = 0
# was it closed just now?
if self.previously_opened:
self.update_server()
self.previously_opened = False
else:
# door opened
self.led.high()
self.ultra_opened_counter += 1
# was it opened just now?
if not self.previously_opened:
self.update_server()
self.previously_opened = True
def run(self):
print("Starting door alarm...")
self.connect()
i = 0
# T = 500ms f = 2 Hz
while True:
try:
self.ultra()
if i >= 20: # every 10 seconds, blink for 100ms
self.led.toggle()
utime.sleep_ms(100)
self.led.toggle()
utime.sleep_ms(400)
i = 0
else:
utime.sleep_ms(500)
except Exception as e:
print(f"Fatal exception occurred: {e}")
i += 1
if __name__ == "__main__":
App().run()