Last active
January 7, 2026 08:29
-
-
Save atanasyanew/8cbb71f2a960365da17038b64eb9fa21 to your computer and use it in GitHub Desktop.
RPI Pimoroni Enviro plus
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Extended Enviro+ example for the Pimoroni Enviro+ board. | |
| This script is derived from Pimoroni's ``all-in-one.py`` example but has two | |
| important differences: | |
| 1. Rather than requiring you to wave your hand over the proximity/light | |
| sensor to cycle through each sensor reading, the display will | |
| automatically rotate between the built-in sensors at a configurable | |
| interval. | |
| 2. Each set of readings is published to an MQTT broker. You can point the | |
| broker at a local Mosquitto instance or any other MQTT server. The | |
| payload is a JSON dictionary containing the temperature, pressure, | |
| humidity, light level and gas/particulate readings along with the | |
| Raspberry Pi's serial number as ``serial``. This makes it easy to | |
| integrate the Enviro+ into a home automation system. | |
| Usage | |
| ----- | |
| .. code:: bash | |
| python3 all_in_one_mqtt.py --broker localhost --topic enviroplus \ | |
| --rotate-interval 10 --publish-interval 15 | |
| The MQTT parameters are optional. If you omit ``--broker`` the script will | |
| default to ``localhost`` on port ``1883``. ``--rotate-interval`` controls | |
| how long (in seconds) each sensor reading is shown on the LCD before | |
| advancing to the next. ``--publish-interval`` controls how often the | |
| script publishes readings to MQTT; by default it publishes on each | |
| display rotation. | |
| This script requires the ``paho-mqtt`` library to be installed. You can | |
| install it with ``pip3 install paho-mqtt``. | |
| """ | |
| import argparse | |
| import colorsys | |
| import json | |
| import logging | |
| import sys | |
| import time | |
| import st7735 | |
| try: | |
| # Transitional fix for breaking change in LTR559 | |
| from ltr559 import LTR559 # type: ignore | |
| ltr559 = LTR559() | |
| except ImportError: | |
| # Fall back to legacy import on older versions of enviroplus | |
| import ltr559 # type: ignore | |
| from bme280 import BME280 # type: ignore | |
| from pms5003 import PMS5003, ReadTimeoutError as pmsReadTimeoutError # type: ignore | |
| from enviroplus import gas # type: ignore | |
| from fonts.ttf import RobotoMedium as UserFont # type: ignore | |
| from PIL import Image, ImageDraw, ImageFont | |
| try: | |
| import paho.mqtt.client as mqtt # type: ignore | |
| except ImportError as e: | |
| print("The paho-mqtt package is required to publish sensor data.\n" | |
| "Install it with 'pip3 install paho-mqtt' before running this script.") | |
| raise | |
| # --------------------------------------------------------------------------- | |
| # Helper functions | |
| # --------------------------------------------------------------------------- | |
| def get_serial_number() -> str: | |
| """Return the Raspberry Pi serial number. | |
| If the serial cannot be determined the function returns a default | |
| string. This value is used as the MQTT client ID and published as | |
| ``serial`` in the payload. | |
| """ | |
| serial = "0000000000000000" | |
| try: | |
| with open("/proc/cpuinfo", "r") as f: | |
| for line in f: | |
| if line.startswith("Serial"): | |
| serial = line.split(":")[1].strip() | |
| break | |
| except Exception: | |
| # ignore and return default | |
| pass | |
| return serial | |
| def get_cpu_temperature() -> float: | |
| """Return the CPU temperature in degrees Celsius for compensation.""" | |
| with open("/sys/class/thermal/thermal_zone0/temp", "r") as f: | |
| temp_str = f.read().strip() | |
| return int(temp_str) / 1000.0 | |
| def get_enviro_readings(bme280: BME280, pms5003: PMS5003, | |
| cpu_temps: list[float], factor: float) -> dict: | |
| """Read all Enviro+ sensors and return a dictionary of values. | |
| The temperature is compensated using the CPU temperature averaged over | |
| the last five readings. Values are returned in their natural units | |
| (temperature in °C, pressure in hPa, humidity in %, light in Lux, | |
| oxidised/reduced/nh3 in kΩ and particulates in µg/m³). | |
| This function updates ``cpu_temps`` in place. You should persist the | |
| ``cpu_temps`` list between calls to smooth out CPU temperature jitter. | |
| """ | |
| readings: dict[str, float] = {} | |
| # Update CPU temperature history for compensation | |
| cpu_temp = get_cpu_temperature() | |
| cpu_temps.append(cpu_temp) | |
| if len(cpu_temps) > 5: | |
| cpu_temps.pop(0) | |
| avg_cpu_temp = sum(cpu_temps) / len(cpu_temps) | |
| # Temperature compensation for the BME280 sensor | |
| raw_temp = bme280.get_temperature() | |
| comp_temp = raw_temp - ((avg_cpu_temp - raw_temp) / factor) | |
| readings["temperature"] = comp_temp | |
| # Pressure and humidity | |
| readings["pressure"] = bme280.get_pressure() | |
| readings["humidity"] = bme280.get_humidity() | |
| # Light level. Use the raw lux value when the proximity sensor is not | |
| # obstructed; otherwise default to 1 to avoid huge spikes when your hand | |
| # is close to the sensor. | |
| proximity = ltr559.get_proximity() | |
| if proximity < 10: | |
| try: | |
| readings["light"] = ltr559.get_lux() | |
| except Exception: | |
| # In some rare cases get_lux() can throw a division by zero | |
| readings["light"] = 0 | |
| else: | |
| readings["light"] = 1 | |
| # Gas sensor readings | |
| gas_data = gas.read_all() | |
| readings["oxidised"] = gas_data.oxidising / 1000.0 | |
| readings["reduced"] = gas_data.reducing / 1000.0 | |
| readings["nh3"] = gas_data.nh3 / 1000.0 | |
| # Particulate readings from the PMS5003. The read can occasionally | |
| # time out if the sensor is not connected; handle gracefully. | |
| try: | |
| pm_values = pms5003.read() | |
| readings["pm1"] = float(pm_values.pm_ug_per_m3(1.0)) | |
| readings["pm25"] = float(pm_values.pm_ug_per_m3(2.5)) | |
| readings["pm10"] = float(pm_values.pm_ug_per_m3(10)) | |
| except pmsReadTimeoutError: | |
| # Assign 0 if the sensor isn’t available or times out | |
| readings["pm1"] = 0.0 | |
| readings["pm25"] = 0.0 | |
| readings["pm10"] = 0.0 | |
| return readings | |
| def create_display() -> st7735.ST7735: | |
| """Initialise and return an ST7735 display object.""" | |
| display = st7735.ST7735( | |
| port=0, | |
| cs=1, | |
| dc="GPIO9", | |
| backlight="GPIO12", | |
| rotation=270, | |
| spi_speed_hz=10_000_000 | |
| ) | |
| display.begin() | |
| return display | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description=( | |
| "Display Enviro+ readings on the ST7735 LCD and publish them over MQTT." | |
| ) | |
| ) | |
| parser.add_argument( | |
| "--broker", | |
| default="localhost", | |
| type=str, | |
| help="MQTT broker IP or hostname" | |
| ) | |
| parser.add_argument( | |
| "--port", | |
| default=1883, | |
| type=int, | |
| help="MQTT broker port (default: 1883)" | |
| ) | |
| parser.add_argument( | |
| "--topic", | |
| default="enviroplus", | |
| type=str, | |
| help="MQTT topic to publish readings to" | |
| ) | |
| parser.add_argument( | |
| "--username", | |
| default=None, | |
| type=str, | |
| help="Optional MQTT username" | |
| ) | |
| parser.add_argument( | |
| "--password", | |
| default=None, | |
| type=str, | |
| help="Optional MQTT password" | |
| ) | |
| parser.add_argument( | |
| "--rotate-interval", | |
| default=5, | |
| type=float, | |
| help="Seconds to show each sensor reading on the display" | |
| ) | |
| parser.add_argument( | |
| "--publish-interval", | |
| default=None, | |
| type=float, | |
| help=( | |
| "Seconds between MQTT publishes. If omitted then publish on each" | |
| " display rotation." | |
| ) | |
| ) | |
| args = parser.parse_args() | |
| # Set up logging | |
| logging.basicConfig( | |
| format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s", | |
| level=logging.INFO, | |
| datefmt="%Y-%m-%d %H:%M:%S" | |
| ) | |
| logging.info("Starting Enviro+ display with MQTT publishing") | |
| # Initialise sensors | |
| bme280 = BME280() | |
| pms5003 = PMS5003() | |
| # Initialise display | |
| disp = create_display() | |
| WIDTH = disp.width | |
| HEIGHT = disp.height | |
| # Create canvas and font | |
| img = Image.new("RGB", (WIDTH, HEIGHT), color=(0, 0, 0)) | |
| draw = ImageDraw.Draw(img) | |
| font_size = 20 | |
| font: ImageFont.FreeTypeFont = ImageFont.truetype(UserFont, font_size) | |
| top_pos = 25 | |
| # Pre-initialise the values dict for graph scaling | |
| variables = [ | |
| "temperature", "pressure", "humidity", "light", | |
| "oxidised", "reduced", "nh3", "pm1", "pm25", "pm10" | |
| ] | |
| units: dict[str, str] = { | |
| "temperature": "°C", | |
| "pressure": "hPa", | |
| "humidity": "%", | |
| "light": "Lux", | |
| "oxidised": "kΩ", | |
| "reduced": "kΩ", | |
| "nh3": "kΩ", | |
| "pm1": "µg/m³", | |
| "pm25": "µg/m³", | |
| "pm10": "µg/m³", | |
| } | |
| values: dict[str, list[float]] = {} | |
| for v in variables: | |
| values[v] = [1.0] * WIDTH | |
| # CPU temperature history for compensation | |
| cpu_temps: list[float] = [get_cpu_temperature()] * 5 | |
| factor = 2.25 # compensation factor | |
| # MQTT client setup | |
| device_serial = get_serial_number() | |
| client_id = f"enviroplus-{device_serial}" | |
| mqtt_client = mqtt.Client(client_id=client_id,callback_api_version=mqtt.CallbackAPIVersion.VERSION2) | |
| if args.username is not None: | |
| mqtt_client.username_pw_set(username=args.username, password=args.password) | |
| try: | |
| mqtt_client.connect(args.broker, port=args.port) | |
| except Exception as exc: | |
| logging.warning(f"Unable to connect to MQTT broker at {args.broker}:{args.port}: {exc}") | |
| mqtt_client = None # disable MQTT publishing | |
| if mqtt_client is not None: | |
| # Start the network loop in a background thread. This is required | |
| # when publishing asynchronously. | |
| mqtt_client.loop_start() | |
| # Determine publish interval | |
| # If not specified, publish on each rotation | |
| publish_interval: float = args.publish_interval or args.rotate_interval | |
| last_publish_time = time.time() | |
| last_rotate_time = time.time() | |
| mode = 0 | |
| def display_text(var: str, data: float, unit: str) -> None: | |
| """Render a coloured bar graph and text on the LCD for the given value.""" | |
| # Scroll values array to maintain history | |
| values[var] = values[var][1:] + [data] | |
| vmin = min(values[var]) | |
| vmax = max(values[var]) | |
| # Avoid divide by zero when all values are equal | |
| denom = (vmax - vmin + 1) | |
| colours = [(v - vmin + 1) / denom for v in values[var]] | |
| message = f"{var[:4]}: {data:.1f} {unit}" | |
| logging.info(message) | |
| # Clear the image | |
| draw.rectangle((0, 0, WIDTH, HEIGHT), fill=(255, 255, 255)) | |
| for i, c in enumerate(colours): | |
| # Map normalised values to a colour gradient from red to blue | |
| colour = (1.0 - c) * 0.6 | |
| r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, 1.0, 1.0)] | |
| # Draw the bar and the line graph over it | |
| draw.rectangle((i, top_pos, i + 1, HEIGHT), fill=(r, g, b)) | |
| line_y = HEIGHT - (top_pos + (c * (HEIGHT - top_pos))) + top_pos | |
| draw.rectangle((i, line_y, i + 1, line_y + 1), fill=(0, 0, 0)) | |
| # Draw the text at the top in black | |
| draw.text((0, 0), message, font=font, fill=(0, 0, 0)) | |
| disp.display(img) | |
| try: | |
| while True: | |
| now = time.time() | |
| # Manual override: detect a “tap” on the proximity sensor | |
| proximity_threshold = 1500 # matches the original script | |
| proximity_delay = 0.5 # seconds between successive taps | |
| last_proximity_time = 0.0 # timestamp of the last tap | |
| prox = ltr559.get_proximity() | |
| if prox > proximity_threshold and (now - last_proximity_time) > proximity_delay: | |
| mode = (mode + 1) % len(variables) | |
| last_proximity_time = now | |
| last_rotate_time = now # optional: reset the rotation timer | |
| # Read all sensors once per loop iteration to ensure consistent data | |
| readings = get_enviro_readings(bme280, pms5003, cpu_temps, factor) | |
| # Automatic rotation after rotate_interval seconds | |
| # Rotate the display when the interval has elapsed | |
| if now - last_rotate_time >= args.rotate_interval: | |
| mode = (mode + 1) % len(variables) | |
| last_rotate_time = now | |
| current_var = variables[mode] | |
| current_unit = units[current_var] | |
| current_value = readings[current_var] | |
| display_text(current_var, current_value, current_unit) | |
| # Publish over MQTT if connected and interval elapsed | |
| if mqtt_client is not None and (now - last_publish_time) >= publish_interval: | |
| last_publish_time = now | |
| payload = readings.copy() | |
| payload["serial"] = device_serial | |
| try: | |
| mqtt_client.publish(args.topic, json.dumps(payload), retain=True) | |
| logging.info(f"Published to MQTT topic {args.topic}") | |
| except Exception as exc: | |
| logging.warning(f"Failed to publish to MQTT: {exc}") | |
| # Sleep for a short period to reduce CPU usage | |
| time.sleep(0.1) | |
| except KeyboardInterrupt: | |
| logging.info("Exiting on user request") | |
| finally: | |
| # Clean up the display on exit | |
| if mqtt_client is not None: | |
| mqtt_client.loop_stop() | |
| mqtt_client.disconnect() | |
| # Attempt to clear the display | |
| draw.rectangle((0, 0, WIDTH, HEIGHT), fill=(0, 0, 0)) | |
| disp.display(img) | |
| sys.exit(0) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment