Skip to content

Instantly share code, notes, and snippets.

@atanasyanew
Last active January 7, 2026 08:29
Show Gist options
  • Select an option

  • Save atanasyanew/8cbb71f2a960365da17038b64eb9fa21 to your computer and use it in GitHub Desktop.

Select an option

Save atanasyanew/8cbb71f2a960365da17038b64eb9fa21 to your computer and use it in GitHub Desktop.
RPI Pimoroni Enviro plus
#!/usr/bin/env python3
"""
Extended Enviro+ example for the Pimoroni Enviro+ board.
This script is derived from Pimoroni's ``all-in-one.py`` example but has two
important differences:
1. Rather than requiring you to wave your hand over the proximity/light
sensor to cycle through each sensor reading, the display will
automatically rotate between the built-in sensors at a configurable
interval.
2. Each set of readings is published to an MQTT broker. You can point the
broker at a local Mosquitto instance or any other MQTT server. The
payload is a JSON dictionary containing the temperature, pressure,
humidity, light level and gas/particulate readings along with the
Raspberry Pi's serial number as ``serial``. This makes it easy to
integrate the Enviro+ into a home automation system.
Usage
-----
.. code:: bash
python3 all_in_one_mqtt.py --broker localhost --topic enviroplus \
--rotate-interval 10 --publish-interval 15
The MQTT parameters are optional. If you omit ``--broker`` the script will
default to ``localhost`` on port ``1883``. ``--rotate-interval`` controls
how long (in seconds) each sensor reading is shown on the LCD before
advancing to the next. ``--publish-interval`` controls how often the
script publishes readings to MQTT; by default it publishes on each
display rotation.
This script requires the ``paho-mqtt`` library to be installed. You can
install it with ``pip3 install paho-mqtt``.
"""
import argparse
import colorsys
import json
import logging
import sys
import time
import st7735
try:
# Transitional fix for breaking change in LTR559
from ltr559 import LTR559 # type: ignore
ltr559 = LTR559()
except ImportError:
# Fall back to legacy import on older versions of enviroplus
import ltr559 # type: ignore
from bme280 import BME280 # type: ignore
from pms5003 import PMS5003, ReadTimeoutError as pmsReadTimeoutError # type: ignore
from enviroplus import gas # type: ignore
from fonts.ttf import RobotoMedium as UserFont # type: ignore
from PIL import Image, ImageDraw, ImageFont
try:
import paho.mqtt.client as mqtt # type: ignore
except ImportError as e:
print("The paho-mqtt package is required to publish sensor data.\n"
"Install it with 'pip3 install paho-mqtt' before running this script.")
raise
# ---------------------------------------------------------------------------
# Helper functions
# ---------------------------------------------------------------------------
def get_serial_number() -> str:
"""Return the Raspberry Pi serial number.
If the serial cannot be determined the function returns a default
string. This value is used as the MQTT client ID and published as
``serial`` in the payload.
"""
serial = "0000000000000000"
try:
with open("/proc/cpuinfo", "r") as f:
for line in f:
if line.startswith("Serial"):
serial = line.split(":")[1].strip()
break
except Exception:
# ignore and return default
pass
return serial
def get_cpu_temperature() -> float:
"""Return the CPU temperature in degrees Celsius for compensation."""
with open("/sys/class/thermal/thermal_zone0/temp", "r") as f:
temp_str = f.read().strip()
return int(temp_str) / 1000.0
def get_enviro_readings(bme280: BME280, pms5003: PMS5003,
cpu_temps: list[float], factor: float) -> dict:
"""Read all Enviro+ sensors and return a dictionary of values.
The temperature is compensated using the CPU temperature averaged over
the last five readings. Values are returned in their natural units
(temperature in °C, pressure in hPa, humidity in %, light in Lux,
oxidised/reduced/nh3 in kΩ and particulates in µg/m³).
This function updates ``cpu_temps`` in place. You should persist the
``cpu_temps`` list between calls to smooth out CPU temperature jitter.
"""
readings: dict[str, float] = {}
# Update CPU temperature history for compensation
cpu_temp = get_cpu_temperature()
cpu_temps.append(cpu_temp)
if len(cpu_temps) > 5:
cpu_temps.pop(0)
avg_cpu_temp = sum(cpu_temps) / len(cpu_temps)
# Temperature compensation for the BME280 sensor
raw_temp = bme280.get_temperature()
comp_temp = raw_temp - ((avg_cpu_temp - raw_temp) / factor)
readings["temperature"] = comp_temp
# Pressure and humidity
readings["pressure"] = bme280.get_pressure()
readings["humidity"] = bme280.get_humidity()
# Light level. Use the raw lux value when the proximity sensor is not
# obstructed; otherwise default to 1 to avoid huge spikes when your hand
# is close to the sensor.
proximity = ltr559.get_proximity()
if proximity < 10:
try:
readings["light"] = ltr559.get_lux()
except Exception:
# In some rare cases get_lux() can throw a division by zero
readings["light"] = 0
else:
readings["light"] = 1
# Gas sensor readings
gas_data = gas.read_all()
readings["oxidised"] = gas_data.oxidising / 1000.0
readings["reduced"] = gas_data.reducing / 1000.0
readings["nh3"] = gas_data.nh3 / 1000.0
# Particulate readings from the PMS5003. The read can occasionally
# time out if the sensor is not connected; handle gracefully.
try:
pm_values = pms5003.read()
readings["pm1"] = float(pm_values.pm_ug_per_m3(1.0))
readings["pm25"] = float(pm_values.pm_ug_per_m3(2.5))
readings["pm10"] = float(pm_values.pm_ug_per_m3(10))
except pmsReadTimeoutError:
# Assign 0 if the sensor isn’t available or times out
readings["pm1"] = 0.0
readings["pm25"] = 0.0
readings["pm10"] = 0.0
return readings
def create_display() -> st7735.ST7735:
"""Initialise and return an ST7735 display object."""
display = st7735.ST7735(
port=0,
cs=1,
dc="GPIO9",
backlight="GPIO12",
rotation=270,
spi_speed_hz=10_000_000
)
display.begin()
return display
def main() -> None:
parser = argparse.ArgumentParser(
description=(
"Display Enviro+ readings on the ST7735 LCD and publish them over MQTT."
)
)
parser.add_argument(
"--broker",
default="localhost",
type=str,
help="MQTT broker IP or hostname"
)
parser.add_argument(
"--port",
default=1883,
type=int,
help="MQTT broker port (default: 1883)"
)
parser.add_argument(
"--topic",
default="enviroplus",
type=str,
help="MQTT topic to publish readings to"
)
parser.add_argument(
"--username",
default=None,
type=str,
help="Optional MQTT username"
)
parser.add_argument(
"--password",
default=None,
type=str,
help="Optional MQTT password"
)
parser.add_argument(
"--rotate-interval",
default=5,
type=float,
help="Seconds to show each sensor reading on the display"
)
parser.add_argument(
"--publish-interval",
default=None,
type=float,
help=(
"Seconds between MQTT publishes. If omitted then publish on each"
" display rotation."
)
)
args = parser.parse_args()
# Set up logging
logging.basicConfig(
format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S"
)
logging.info("Starting Enviro+ display with MQTT publishing")
# Initialise sensors
bme280 = BME280()
pms5003 = PMS5003()
# Initialise display
disp = create_display()
WIDTH = disp.width
HEIGHT = disp.height
# Create canvas and font
img = Image.new("RGB", (WIDTH, HEIGHT), color=(0, 0, 0))
draw = ImageDraw.Draw(img)
font_size = 20
font: ImageFont.FreeTypeFont = ImageFont.truetype(UserFont, font_size)
top_pos = 25
# Pre-initialise the values dict for graph scaling
variables = [
"temperature", "pressure", "humidity", "light",
"oxidised", "reduced", "nh3", "pm1", "pm25", "pm10"
]
units: dict[str, str] = {
"temperature": "°C",
"pressure": "hPa",
"humidity": "%",
"light": "Lux",
"oxidised": "kΩ",
"reduced": "kΩ",
"nh3": "kΩ",
"pm1": "µg/m³",
"pm25": "µg/m³",
"pm10": "µg/m³",
}
values: dict[str, list[float]] = {}
for v in variables:
values[v] = [1.0] * WIDTH
# CPU temperature history for compensation
cpu_temps: list[float] = [get_cpu_temperature()] * 5
factor = 2.25 # compensation factor
# MQTT client setup
device_serial = get_serial_number()
client_id = f"enviroplus-{device_serial}"
mqtt_client = mqtt.Client(client_id=client_id,callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
if args.username is not None:
mqtt_client.username_pw_set(username=args.username, password=args.password)
try:
mqtt_client.connect(args.broker, port=args.port)
except Exception as exc:
logging.warning(f"Unable to connect to MQTT broker at {args.broker}:{args.port}: {exc}")
mqtt_client = None # disable MQTT publishing
if mqtt_client is not None:
# Start the network loop in a background thread. This is required
# when publishing asynchronously.
mqtt_client.loop_start()
# Determine publish interval
# If not specified, publish on each rotation
publish_interval: float = args.publish_interval or args.rotate_interval
last_publish_time = time.time()
last_rotate_time = time.time()
mode = 0
def display_text(var: str, data: float, unit: str) -> None:
"""Render a coloured bar graph and text on the LCD for the given value."""
# Scroll values array to maintain history
values[var] = values[var][1:] + [data]
vmin = min(values[var])
vmax = max(values[var])
# Avoid divide by zero when all values are equal
denom = (vmax - vmin + 1)
colours = [(v - vmin + 1) / denom for v in values[var]]
message = f"{var[:4]}: {data:.1f} {unit}"
logging.info(message)
# Clear the image
draw.rectangle((0, 0, WIDTH, HEIGHT), fill=(255, 255, 255))
for i, c in enumerate(colours):
# Map normalised values to a colour gradient from red to blue
colour = (1.0 - c) * 0.6
r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, 1.0, 1.0)]
# Draw the bar and the line graph over it
draw.rectangle((i, top_pos, i + 1, HEIGHT), fill=(r, g, b))
line_y = HEIGHT - (top_pos + (c * (HEIGHT - top_pos))) + top_pos
draw.rectangle((i, line_y, i + 1, line_y + 1), fill=(0, 0, 0))
# Draw the text at the top in black
draw.text((0, 0), message, font=font, fill=(0, 0, 0))
disp.display(img)
try:
while True:
now = time.time()
# Manual override: detect a “tap” on the proximity sensor
proximity_threshold = 1500 # matches the original script
proximity_delay = 0.5 # seconds between successive taps
last_proximity_time = 0.0 # timestamp of the last tap
prox = ltr559.get_proximity()
if prox > proximity_threshold and (now - last_proximity_time) > proximity_delay:
mode = (mode + 1) % len(variables)
last_proximity_time = now
last_rotate_time = now # optional: reset the rotation timer
# Read all sensors once per loop iteration to ensure consistent data
readings = get_enviro_readings(bme280, pms5003, cpu_temps, factor)
# Automatic rotation after rotate_interval seconds
# Rotate the display when the interval has elapsed
if now - last_rotate_time >= args.rotate_interval:
mode = (mode + 1) % len(variables)
last_rotate_time = now
current_var = variables[mode]
current_unit = units[current_var]
current_value = readings[current_var]
display_text(current_var, current_value, current_unit)
# Publish over MQTT if connected and interval elapsed
if mqtt_client is not None and (now - last_publish_time) >= publish_interval:
last_publish_time = now
payload = readings.copy()
payload["serial"] = device_serial
try:
mqtt_client.publish(args.topic, json.dumps(payload), retain=True)
logging.info(f"Published to MQTT topic {args.topic}")
except Exception as exc:
logging.warning(f"Failed to publish to MQTT: {exc}")
# Sleep for a short period to reduce CPU usage
time.sleep(0.1)
except KeyboardInterrupt:
logging.info("Exiting on user request")
finally:
# Clean up the display on exit
if mqtt_client is not None:
mqtt_client.loop_stop()
mqtt_client.disconnect()
# Attempt to clear the display
draw.rectangle((0, 0, WIDTH, HEIGHT), fill=(0, 0, 0))
disp.display(img)
sys.exit(0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment