Skip to content

Instantly share code, notes, and snippets.

@darkmanlv
Last active February 2, 2026 21:20
Show Gist options
  • Select an option

  • Save darkmanlv/045e7d277fd9a224f45804dc3ba14cb2 to your computer and use it in GitHub Desktop.

Select an option

Save darkmanlv/045e7d277fd9a224f45804dc3ba14cb2 to your computer and use it in GitHub Desktop.
DHT22 (using CircuitPython DHT) + raspberry pi + ThingSpeak for data storage
[Unit]
Description=DHT22 -> ThingSpeak uploader
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=darkman
WorkingDirectory=/home/your_home_folder/TempHum
ExecStart=/home/your_home_folder/dht-venv/bin/python /home/your_home_folder/TempHum/dht22_thingspeak.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
#!/usr/bin/env python3
import time
import socket
import urllib.request
import urllib.error
import board
import adafruit_dht
# ---------------- CONFIG ----------------
API_KEY = "your_api_key"
DHT_PIN = board.D4 # GPIO4 (BCM)
DELAY_SECONDS = 180 # 3 minutes
# ThingSpeak fields:
# field1 = temp C
# field2 = temp F
# field3 = humidity
# field4 = status (1 ok / 0 fail)
TS_URL_FMT = (
"https://api.thingspeak.com/update?api_key={key}"
"&field1={t_c}&field2={t_f}&field3={rh}&field4={status}"
)
# ----------------------------------------
# Init DHT22
dht = adafruit_dht.DHT22(DHT_PIN)
def c_to_f(t_c: float) -> float:
return (t_c * 9.0 / 5.0) + 32.0
def read_sensor(max_attempts=6, sleep_between=2.0):
"""
CircuitPython DHT throws RuntimeError on bad reads.
Retry several times.
Returns (humidity, temp_c) or (None, None)
"""
for _ in range(max_attempts):
try:
t_c = dht.temperature
rh = dht.humidity
if t_c is not None and rh is not None:
return float(rh), float(t_c)
except RuntimeError:
pass
time.sleep(sleep_between)
return None, None
def send_to_thingspeak(t_c: float, t_f: float, rh: float, status: int, timeout=10) -> str:
url = TS_URL_FMT.format(
key=API_KEY,
t_c=f"{t_c:.2f}",
t_f=f"{t_f:.2f}",
rh=f"{rh:.2f}",
status=str(status),
)
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8", errors="replace").strip()
def main():
print("DHT22 -> ThingSpeak service started")
print(f"Update interval: {DELAY_SECONDS} seconds")
while True:
rh, t_c = read_sensor()
ok = (rh is not None and t_c is not None)
if ok:
t_f = c_to_f(t_c)
print(f"TempC={t_c:.2f} TempF={t_f:.2f} Humidity={rh:.2f}")
try:
entry_id = send_to_thingspeak(t_c, t_f, rh, status=1)
print(f"ThingSpeak entry id: {entry_id}")
except (urllib.error.URLError, socket.timeout, Exception) as e:
print(f"ThingSpeak send FAILED: {e}")
else:
print("Sensor read FAILED (no upload)")
time.sleep(DELAY_SECONDS)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment