-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
138 lines (113 loc) · 3.9 KB
/
main.py
File metadata and controls
138 lines (113 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# import modules
import network
import socket
import time
from machine import Pin, I2C
from umqtt.simple import MQTTClient
import bme280
import json
# LED configuration
led = Pin("LED", Pin.OUT)
def blink_led(interval, count):
for _ in range(count):
led.on()
time.sleep(interval)
led.off()
time.sleep(interval)
# Load configuration from file
def load_config():
try:
with open("config.json", "r") as file:
config = json.load(file)
return config
except Exception as e:
print("Failed to load configuration:", e)
return None
# Load configuration
config = load_config()
if not config:
print("Exiting due to configuration error.")
blink_led(0.1, 10) # Rapid blinking to indicate config error
blink_led(1, 10) # softly blinking to indicate config error
blink_led(0.1, 10) # Rapid blinking to indicate config error
raise SystemExit
# Wi-Fi configuration
wifi_ssid = config["wifi"]["ssid"]
wifi_password = config["wifi"]["password"]
# MQTT broker configuration
mqtt_broker = config["mqtt"]["broker"]
mqtt_port = config["mqtt"]["port"]
mqtt_username = config["mqtt"]["username"]
mqtt_password = config["mqtt"]["password"]
mqtt_temperature_topic = config["mqtt"]["topics"]["temperature"]
mqtt_pressure_topic = config["mqtt"]["topics"]["pressure"]
mqtt_humidity_topic = config["mqtt"]["topics"]["humidity"]
# update interval configuration
update_interval = config["update_interval"]["time"] // 2
# MQTT client configuration
mqtt_client_id = config["mqtt"]["client_id"]
client = MQTTClient(
mqtt_client_id,
mqtt_broker,
mqtt_port,
mqtt_username,
mqtt_password,
keepalive=(update_interval * 2 + 50),
)
# BME280 sensor configuration
i2c = I2C(id=0, scl=Pin(1), sda=Pin(0), freq=100000)
bme = bme280.BME280(i2c=i2c)
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
while not wlan.isconnected():
try:
print("Connecting to Wi-Fi...")
wlan.connect(wifi_ssid, wifi_password)
time.sleep(4) # Wait for 4 seconds before checking connection status
except Exception as e:
print("Wi-Fi connection failed:", e)
blink_led(0.1, 10) # Rapid blinking to indicate Wi-Fi error
time.sleep(10) # Wait for 10 seconds before retrying
print("Connected to Wi-Fi:", wlan.ifconfig())
def connect_mqtt():
while True:
try:
client.connect()
print("Connected to MQTT broker")
break
except Exception as e:
print("MQTT connection failed:", e)
blink_led(0.5, 5) # Slow blinking to indicate MQTT error
time.sleep(30) # Wait for 30 seconds before retrying
def publish_data(topic, data, retain=True):
try:
client.publish(topic, data, retain=retain)
print("Published data to {}: {}".format(topic, data))
except Exception as e:
print("Failed to publish data:", e)
blink_led(0.5, 5) # Slow blinking to indicate MQTT publish error
connect_mqtt() # Reconnect to MQTT broker
def read_sensor():
try:
temperature, pressure, humidity = bme.read_compensated_data()
return temperature / 100, round(pressure / 25600, 2), round(humidity / 1024, 2)
except Exception as e:
print("Failed to read sensor data:", e)
blink_led(0.2, 20) # Fast blinking to indicate sensor error
return None, None, None
def main():
connect_wifi()
connect_mqtt()
while True:
temperature, pressure, humidity = read_sensor()
publish_data(mqtt_temperature_topic, str(temperature))
publish_data(mqtt_pressure_topic, str(pressure))
publish_data(mqtt_humidity_topic, str(humidity))
for _ in range(update_interval):
led.on()
time.sleep(1)
led.off()
time.sleep(1)
if __name__ == "__main__":
main()