Last active
May 31, 2025 17:59
-
-
Save bbartling/88e95557b0b297bea721c0f285657d53 to your computer and use it in GitHub Desktop.
simple BAC0 scripts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import BAC0 | |
| """ | |
| not tested yet | |
| """ | |
| async def main(): | |
| bacnet = BAC0.lite() | |
| while True: | |
| try: | |
| # Read from MSTP device (hardware address 2 on MSTP network 12345) | |
| read_sensor_str = '12345:2 analogInput 2 presentValue' | |
| print(f"Executing read: {read_sensor_str}") | |
| sensor = await bacnet.read(read_sensor_str) | |
| print(f"Sensor value: {sensor}") | |
| # Write to Raspberry Pi BACnet server (adjusted by -10) | |
| write_vals = f'192.168.0.101/24 analogValue 1 presentValue {sensor - 10}' | |
| print(f"Executing write: {write_vals}") | |
| await bacnet.write(write_vals) | |
| # Read calculated future data | |
| read_future_data_str = '192.168.0.101/24 analogValue 2 presentValue' | |
| print(f"Executing read: {read_future_data_str}") | |
| future_data = await bacnet.read(read_future_data_str) | |
| print(f"Future data value: {future_data}") | |
| # Read rate of change | |
| read_rate_of_change_str = '192.168.0.101/24 analogValue 3 presentValue' | |
| print(f"Executing read: {read_rate_of_change_str}") | |
| rate_of_change = await bacnet.read(read_rate_of_change_str) | |
| print(f"Rate of change value: {rate_of_change}") | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| await asyncio.sleep(60) # Non-blocking sleep | |
| # Entry point | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("Script interrupted by user.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import BAC0 | |
| async def main(): | |
| bacnet = BAC0.lite() | |
| await asyncio.sleep(2) # Let BAC0 settle/startup | |
| # Discover devices | |
| print("Starting whois discovery...") | |
| devices = await bacnet.whois(global_broadcast=True) | |
| device_mapping = {} | |
| addresses = [] | |
| for device in devices: | |
| if isinstance(device, tuple): | |
| device_mapping[device[1]] = device[0] | |
| print(f"Detected device {device[1]} with address {device[0]}") | |
| print(device_mapping) | |
| print(f"{len(device_mapping)} devices discovered on network.") | |
| # Filter devices with instance ID less than 100 (JCI boxes) | |
| for bacnet_inst, address in device_mapping.items(): | |
| if bacnet_inst < 100: | |
| addresses.append(address) | |
| print("Filtered addresses:", addresses) | |
| # Object settings | |
| object_type = 'analogValue' | |
| object_instance = '1103' | |
| priority = 'default' | |
| min_value = 55.0 | |
| max_value = 85.0 | |
| for address in addresses: | |
| try: | |
| print(f"Processing device: {address}") | |
| # --- MIN VALUE --- | |
| read_min_pres_value = f'{address} {object_type} {object_instance} minPresValue' | |
| check_min = await bacnet.read(read_min_pres_value) | |
| print(f"Current Min: {check_min}") | |
| if check_min != min_value: | |
| write_min_pres_value = f'{address} {object_type} {object_instance} minPresValue {min_value} - {priority}' | |
| await bacnet.write(write_min_pres_value) | |
| print(f"Wrote Min: {min_value}") | |
| # Confirm | |
| check_min = await bacnet.read(read_min_pres_value) | |
| print(f"Confirmed Min: {check_min}") | |
| else: | |
| print(f"Min already correct on device {address}") | |
| # --- MAX VALUE --- | |
| read_max_pres_value = f'{address} {object_type} {object_instance} maxPresValue' | |
| check_max = await bacnet.read(read_max_pres_value) | |
| print(f"Current Max: {check_max}") | |
| if check_max != max_value: | |
| write_max_pres_value = f'{address} {object_type} {object_instance} maxPresValue {max_value} - {priority}' | |
| await bacnet.write(write_max_pres_value) | |
| print(f"Wrote Max: {max_value}") | |
| # Confirm | |
| check_max = await bacnet.read(read_max_pres_value) | |
| print(f"Confirmed Max: {check_max}") | |
| else: | |
| print(f"Max already correct on device {address}") | |
| print("*********************************") | |
| except Exception as error: | |
| print(f"OOF error on device {address}: {error}") | |
| print("ALL DIGITY DONE!!!") | |
| # Entry point | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("Script interrupted by user.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import BAC0 | |
| async def main(): | |
| bacnet = BAC0.lite() | |
| await asyncio.sleep(2) # Allow BACnet app to stabilize | |
| # Discover devices | |
| print("Starting whois discovery...") | |
| devices = await bacnet.whois(global_broadcast=True) | |
| device_mapping = {} | |
| addresses = [] | |
| for device in devices: | |
| if isinstance(device, tuple): | |
| device_mapping[device[1]] = device[0] | |
| print(f"Detected device {device[1]} with address {device[0]}") | |
| print(device_mapping) | |
| print(f"{len(device_mapping)} devices discovered on network.") | |
| # Filter devices by instance ID (e.g., < 100 for JCI VAV boxes) | |
| for bacnet_inst, address in device_mapping.items(): | |
| if bacnet_inst < 100: | |
| addresses.append(address) | |
| print("Filtered addresses:", addresses) | |
| # ---- Config ---- | |
| object_type = 'analogOutput' | |
| object_instance = '2014' | |
| priority = '10' # Priority to release | |
| # Loop over devices and release | |
| for address in addresses: | |
| try: | |
| print(f"Releasing setpoint on DEVICE: {address}") | |
| release_me = f'{address} {object_type} {object_instance} presentValue null - {priority}' | |
| await bacnet.write(release_me) | |
| print(f"Released {object_instance} at priority {priority} on {address}") | |
| except Exception as error: | |
| print(f"Error on device {address}: {error}") | |
| print("ALL DIGITY DONE!!!") | |
| # Entry point | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("Script interrupted by user.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| from datetime import datetime, timedelta | |
| import BAC0 | |
| async def main(): | |
| bacnet = BAC0.lite() | |
| address = '12345:2' | |
| sensor_object_type = 'analogInput' | |
| sensor_object_instance = '2' | |
| cmd_object_type = 'analogValue' | |
| cmd_object_instance = '302' | |
| priority = 10 | |
| # ADR event settings | |
| hi_zone_temp_release = 80.0 | |
| duration_minutes = 30 | |
| start_time = datetime.now() | |
| end_time = start_time + timedelta(minutes=duration_minutes) | |
| print("ON START: WRITING OVERRIDE TO THE BAS!!!") | |
| # Write initial override | |
| write_vals = f'{address} {cmd_object_type} {cmd_object_instance} presentValue 0 - {priority}' | |
| print(f"Executing write: {write_vals}") | |
| await bacnet.write(write_vals) | |
| print('EVENT START OVERRIDE SUCCESS!!!') | |
| async def half_time_job(): | |
| print("Checking BAS sensor!!!!") | |
| read_vals = f'{address} {sensor_object_type} {sensor_object_instance} presentValue' | |
| check = await bacnet.read(read_vals) | |
| print(f"Sensor value is: {check}") | |
| if check >= hi_zone_temp_release: | |
| # Create write statement to release | |
| write_vals = f'{address} {cmd_object_type} {cmd_object_instance} presentValue null - {priority}' | |
| print(f"Release triggered due to high temp: {write_vals}") | |
| await bacnet.write(write_vals) | |
| print('Release triggered successfully!!!') | |
| return True # Indicate release was done | |
| return False # No release yet | |
| # Scheduler loop (every minute) | |
| print("Scheduling periodic HALF TIME CHECK every 1 minute!!!!") | |
| released = False | |
| while datetime.now() < end_time and not released: | |
| released = await half_time_job() | |
| await asyncio.sleep(60) # Wait 1 minute | |
| if not released: | |
| print("HALF TIME EVENT COMPLETED. Time Expired, Releasing Override!") | |
| # Final release if not already done | |
| write_vals = f'{address} {cmd_object_type} {cmd_object_instance} presentValue null - {priority}' | |
| print(f"End EVENT RELEASE: {write_vals}") | |
| await bacnet.write(write_vals) | |
| print('End EVENT RELEASE SUCCESS!!!') | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("Script interrupted by user.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import BAC0 | |
| async def main(): | |
| bacnet = BAC0.lite() | |
| address = "192.168.0.22" | |
| object_type = "analogInput" | |
| object_instance = "2" | |
| while True: | |
| request = f"{address} {object_type} {object_instance} presentValue" | |
| temp = await bacnet.read(request) # <- you need to await it! | |
| print(f"Temp is {temp}") | |
| await asyncio.sleep(60) # asyncio-friendly sleep | |
| # Run the async main | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import requests | |
| import BAC0 | |
| async def main(): | |
| bacnet = BAC0.lite() | |
| address = "192.168.0.101/24" | |
| object_type = "analogValue" | |
| object_instance = "1" | |
| priority = 10 # BACnet priority | |
| # Replace with the actual latitude and longitude | |
| location = '46.7867,-92.1005' | |
| def get_current_temperature(location): | |
| points_url = f"https://api.weather.gov/points/{location}" | |
| response = requests.get(points_url, timeout=10) | |
| if response.status_code != 200: | |
| print("Failed to retrieve location data.") | |
| return None | |
| data = response.json() | |
| forecast_url = data["properties"]["forecast"] | |
| response = requests.get(forecast_url, timeout=10) | |
| if response.status_code != 200: | |
| print("Failed to retrieve forecast data.") | |
| return None | |
| data = response.json() | |
| temperature = data["properties"]["periods"][0]["temperature"] | |
| return temperature | |
| while True: | |
| temperature = get_current_temperature(location) | |
| if temperature is not None: | |
| print(f"Current temperature for the location: {temperature}°F") | |
| # Write temperature to BACnet device | |
| write_vals = f'{address} {object_type} {object_instance} presentValue {temperature} - {priority}' | |
| await bacnet.write(write_vals) | |
| else: | |
| print("Temperature data not available.") | |
| await asyncio.sleep(60) # Non-blocking sleep | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("Script interrupted by user.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment